Conditionally change max_age

This commit is contained in:
Ryo Onodera
2020-04-13 15:07:38 +09:00
committed by Michael Vines
parent d888e0a6d7
commit 5f1c637508
4 changed files with 89 additions and 12 deletions

View File

@ -26,6 +26,7 @@ use solana_metrics::inc_new_counter_info;
use solana_runtime::bank::Bank; use solana_runtime::bank::Bank;
use solana_sdk::{ use solana_sdk::{
clock::Slot, clock::Slot,
genesis_config::GenesisConfig,
hash::Hash, hash::Hash,
pubkey::Pubkey, pubkey::Pubkey,
signature::{Keypair, Signer}, signature::{Keypair, Signer},
@ -80,6 +81,7 @@ pub struct ReplayStageConfig {
pub block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>, pub block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
pub transaction_status_sender: Option<TransactionStatusSender>, pub transaction_status_sender: Option<TransactionStatusSender>,
pub rewards_recorder_sender: Option<RewardsRecorderSender>, pub rewards_recorder_sender: Option<RewardsRecorderSender>,
pub genesis_config: GenesisConfig,
} }
pub struct ReplayStage { pub struct ReplayStage {
@ -183,6 +185,7 @@ impl ReplayStage {
block_commitment_cache, block_commitment_cache,
transaction_status_sender, transaction_status_sender,
rewards_recorder_sender, rewards_recorder_sender,
genesis_config,
} = config; } = config;
let (root_bank_sender, root_bank_receiver) = channel(); let (root_bank_sender, root_bank_receiver) = channel();
@ -246,6 +249,7 @@ impl ReplayStage {
&slot_full_senders, &slot_full_senders,
transaction_status_sender.clone(), transaction_status_sender.clone(),
&verify_recyclers, &verify_recyclers,
&genesis_config,
); );
datapoint_debug!( datapoint_debug!(
"replay_stage-memory", "replay_stage-memory",
@ -551,6 +555,7 @@ impl ReplayStage {
bank_progress: &mut ForkProgress, bank_progress: &mut ForkProgress,
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
verify_recyclers: &VerifyRecyclers, verify_recyclers: &VerifyRecyclers,
genesis_config: &GenesisConfig,
) -> result::Result<usize, BlockstoreProcessorError> { ) -> result::Result<usize, BlockstoreProcessorError> {
let tx_count_before = bank_progress.replay_progress.num_txs; let tx_count_before = bank_progress.replay_progress.num_txs;
let confirm_result = blockstore_processor::confirm_slot( let confirm_result = blockstore_processor::confirm_slot(
@ -562,6 +567,7 @@ impl ReplayStage {
transaction_status_sender, transaction_status_sender,
None, None,
verify_recyclers, verify_recyclers,
Some(genesis_config),
); );
let tx_count_after = bank_progress.replay_progress.num_txs; let tx_count_after = bank_progress.replay_progress.num_txs;
let tx_count = tx_count_after - tx_count_before; let tx_count = tx_count_after - tx_count_before;
@ -737,6 +743,7 @@ impl ReplayStage {
slot_full_senders: &[Sender<(u64, Pubkey)>], slot_full_senders: &[Sender<(u64, Pubkey)>],
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
verify_recyclers: &VerifyRecyclers, verify_recyclers: &VerifyRecyclers,
genesis_config: &GenesisConfig,
) -> bool { ) -> bool {
let mut did_complete_bank = false; let mut did_complete_bank = false;
let mut tx_count = 0; let mut tx_count = 0;
@ -765,6 +772,7 @@ impl ReplayStage {
bank_progress, bank_progress,
transaction_status_sender.clone(), transaction_status_sender.clone(),
verify_recyclers, verify_recyclers,
genesis_config,
); );
match replay_result { match replay_result {
Ok(replay_tx_count) => tx_count += replay_tx_count, Ok(replay_tx_count) => tx_count += replay_tx_count,
@ -1709,6 +1717,7 @@ pub(crate) mod tests {
&mut bank0_progress, &mut bank0_progress,
None, None,
&VerifyRecyclers::default(), &VerifyRecyclers::default(),
&genesis_config,
); );
// Check that the erroring bank was marked as dead in the progress map // Check that the erroring bank was marked as dead in the progress map

View File

@ -26,6 +26,7 @@ use solana_ledger::{
snapshot_package::SnapshotPackageSender, snapshot_package::SnapshotPackageSender,
}; };
use solana_sdk::{ use solana_sdk::{
genesis_config::GenesisConfig,
pubkey::Pubkey, pubkey::Pubkey,
signature::{Keypair, Signer}, signature::{Keypair, Signer},
}; };
@ -67,6 +68,7 @@ pub struct TvuConfig {
pub halt_on_trusted_validators_accounts_hash_mismatch: bool, pub halt_on_trusted_validators_accounts_hash_mismatch: bool,
pub trusted_validators: Option<HashSet<Pubkey>>, pub trusted_validators: Option<HashSet<Pubkey>>,
pub accounts_hash_fault_injection_slots: u64, pub accounts_hash_fault_injection_slots: u64,
pub genesis_config: GenesisConfig,
} }
impl Tvu { impl Tvu {
@ -185,6 +187,7 @@ impl Tvu {
block_commitment_cache: block_commitment_cache.clone(), block_commitment_cache: block_commitment_cache.clone(),
transaction_status_sender, transaction_status_sender,
rewards_recorder_sender, rewards_recorder_sender,
genesis_config: tvu_config.genesis_config,
}; };
let (replay_stage, root_bank_receiver) = ReplayStage::new( let (replay_stage, root_bank_receiver) = ReplayStage::new(

View File

@ -316,7 +316,7 @@ impl Validator {
std::thread::park(); std::thread::park();
} }
let poh_config = Arc::new(genesis_config.poh_config); let poh_config = Arc::new(genesis_config.poh_config.clone());
let (mut poh_recorder, entry_receiver) = PohRecorder::new_with_clear_signal( let (mut poh_recorder, entry_receiver) = PohRecorder::new_with_clear_signal(
bank.tick_height(), bank.tick_height(),
bank.last_blockhash(), bank.last_blockhash(),
@ -443,6 +443,7 @@ impl Validator {
shred_version: node.info.shred_version, shred_version: node.info.shred_version,
trusted_validators: config.trusted_validators.clone(), trusted_validators: config.trusted_validators.clone(),
accounts_hash_fault_injection_slots: config.accounts_hash_fault_injection_slots, accounts_hash_fault_injection_slots: config.accounts_hash_fault_injection_slots,
genesis_config,
}, },
); );

View File

@ -20,8 +20,8 @@ use solana_runtime::{
transaction_batch::TransactionBatch, transaction_batch::TransactionBatch,
}; };
use solana_sdk::{ use solana_sdk::{
clock::{Slot, MAX_PROCESSING_AGE}, clock::{Slot, MAX_PROCESSING_AGE, MAX_RECENT_BLOCKHASHES},
genesis_config::GenesisConfig, genesis_config::{GenesisConfig, OperatingMode},
hash::Hash, hash::Hash,
pubkey::Pubkey, pubkey::Pubkey,
signature::Keypair, signature::Keypair,
@ -57,11 +57,22 @@ fn first_err(results: &[Result<()>]) -> Result<()> {
Ok(()) Ok(())
} }
const MAX_AGE_CORRECTION_EPOCH: u64 = 14;
fn execute_batch( fn execute_batch(
batch: &TransactionBatch, batch: &TransactionBatch,
bank: &Arc<Bank>, bank: &Arc<Bank>,
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
genesis_config: Option<&GenesisConfig>,
) -> Result<()> { ) -> Result<()> {
// See https://github.com/solana-labs/solana/pull/9423
let max_age_reduced = if let Some(genesis_config) = genesis_config {
genesis_config.operating_mode == OperatingMode::Stable
&& bank.epoch() >= MAX_AGE_CORRECTION_EPOCH
} else {
false
};
let ( let (
TransactionResults { TransactionResults {
fee_collection_results, fee_collection_results,
@ -70,7 +81,11 @@ fn execute_batch(
balances, balances,
) = batch.bank().load_execute_and_commit_transactions( ) = batch.bank().load_execute_and_commit_transactions(
batch, batch,
MAX_PROCESSING_AGE, if max_age_reduced {
MAX_PROCESSING_AGE
} else {
MAX_RECENT_BLOCKHASHES
},
transaction_status_sender.is_some(), transaction_status_sender.is_some(),
); );
@ -112,6 +127,7 @@ fn execute_batches(
batches: &[TransactionBatch], batches: &[TransactionBatch],
entry_callback: Option<&ProcessCallback>, entry_callback: Option<&ProcessCallback>,
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
genesis_config: Option<&GenesisConfig>,
) -> Result<()> { ) -> Result<()> {
inc_new_counter_debug!("bank-par_execute_entries-count", batches.len()); inc_new_counter_debug!("bank-par_execute_entries-count", batches.len());
let results: Vec<Result<()>> = PAR_THREAD_POOL.with(|thread_pool| { let results: Vec<Result<()>> = PAR_THREAD_POOL.with(|thread_pool| {
@ -119,7 +135,7 @@ fn execute_batches(
batches batches
.into_par_iter() .into_par_iter()
.map_with(transaction_status_sender, |sender, batch| { .map_with(transaction_status_sender, |sender, batch| {
let result = execute_batch(batch, bank, sender.clone()); let result = execute_batch(batch, bank, sender.clone(), genesis_config);
if let Some(entry_callback) = entry_callback { if let Some(entry_callback) = entry_callback {
entry_callback(bank); entry_callback(bank);
} }
@ -143,7 +159,14 @@ pub fn process_entries(
randomize: bool, randomize: bool,
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
) -> Result<()> { ) -> Result<()> {
process_entries_with_callback(bank, entries, randomize, None, transaction_status_sender) process_entries_with_callback(
bank,
entries,
randomize,
None,
transaction_status_sender,
Some(&GenesisConfig::default()),
)
} }
fn process_entries_with_callback( fn process_entries_with_callback(
@ -152,6 +175,7 @@ fn process_entries_with_callback(
randomize: bool, randomize: bool,
entry_callback: Option<&ProcessCallback>, entry_callback: Option<&ProcessCallback>,
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
genesis_config: Option<&GenesisConfig>,
) -> Result<()> { ) -> Result<()> {
// accumulator for entries that can be processed in parallel // accumulator for entries that can be processed in parallel
let mut batches = vec![]; let mut batches = vec![];
@ -168,6 +192,7 @@ fn process_entries_with_callback(
&batches, &batches,
entry_callback, entry_callback,
transaction_status_sender.clone(), transaction_status_sender.clone(),
genesis_config,
)?; )?;
batches.clear(); batches.clear();
for hash in &tick_hashes { for hash in &tick_hashes {
@ -223,12 +248,19 @@ fn process_entries_with_callback(
&batches, &batches,
entry_callback, entry_callback,
transaction_status_sender.clone(), transaction_status_sender.clone(),
genesis_config,
)?; )?;
batches.clear(); batches.clear();
} }
} }
} }
execute_batches(bank, &batches, entry_callback, transaction_status_sender)?; execute_batches(
bank,
&batches,
entry_callback,
transaction_status_sender,
genesis_config,
)?;
for hash in tick_hashes { for hash in tick_hashes {
bank.register_tick(&hash); bank.register_tick(&hash);
} }
@ -363,6 +395,7 @@ pub fn process_blockstore_from_root(
&mut rooted_path, &mut rooted_path,
opts, opts,
recyclers, recyclers,
genesis_config,
)?; )?;
let (banks, bank_forks_info): (Vec<_>, Vec<_>) = let (banks, bank_forks_info): (Vec<_>, Vec<_>) =
fork_info.into_iter().map(|(_, v)| v).unzip(); fork_info.into_iter().map(|(_, v)| v).unzip();
@ -456,6 +489,7 @@ fn confirm_full_slot(
last_entry_hash: &Hash, last_entry_hash: &Hash,
opts: &ProcessOptions, opts: &ProcessOptions,
recyclers: &VerifyRecyclers, recyclers: &VerifyRecyclers,
genesis_config: Option<&GenesisConfig>,
) -> result::Result<(), BlockstoreProcessorError> { ) -> result::Result<(), BlockstoreProcessorError> {
let mut timing = ConfirmationTiming::default(); let mut timing = ConfirmationTiming::default();
let mut progress = ConfirmationProgress::new(*last_entry_hash); let mut progress = ConfirmationProgress::new(*last_entry_hash);
@ -469,6 +503,7 @@ fn confirm_full_slot(
None, None,
opts.entry_callback.as_ref(), opts.entry_callback.as_ref(),
recyclers, recyclers,
genesis_config,
)?; )?;
if !bank.is_complete() { if !bank.is_complete() {
@ -527,6 +562,7 @@ pub fn confirm_slot(
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
entry_callback: Option<&ProcessCallback>, entry_callback: Option<&ProcessCallback>,
recyclers: &VerifyRecyclers, recyclers: &VerifyRecyclers,
genesis_config: Option<&GenesisConfig>,
) -> result::Result<(), BlockstoreProcessorError> { ) -> result::Result<(), BlockstoreProcessorError> {
let slot = bank.slot(); let slot = bank.slot();
@ -592,6 +628,7 @@ pub fn confirm_slot(
true, true,
entry_callback, entry_callback,
transaction_status_sender, transaction_status_sender,
genesis_config,
) )
.map_err(BlockstoreProcessorError::from); .map_err(BlockstoreProcessorError::from);
replay_elapsed.stop(); replay_elapsed.stop();
@ -625,7 +662,14 @@ fn process_bank_0(
recyclers: &VerifyRecyclers, recyclers: &VerifyRecyclers,
) -> result::Result<(), BlockstoreProcessorError> { ) -> result::Result<(), BlockstoreProcessorError> {
assert_eq!(bank0.slot(), 0); assert_eq!(bank0.slot(), 0);
confirm_full_slot(blockstore, bank0, &bank0.last_blockhash(), opts, recyclers) confirm_full_slot(
blockstore,
bank0,
&bank0.last_blockhash(),
opts,
recyclers,
None,
)
.expect("processing for bank 0 must succceed"); .expect("processing for bank 0 must succceed");
bank0.freeze(); bank0.freeze();
Ok(()) Ok(())
@ -701,6 +745,7 @@ fn process_pending_slots(
rooted_path: &mut Vec<u64>, rooted_path: &mut Vec<u64>,
opts: &ProcessOptions, opts: &ProcessOptions,
recyclers: &VerifyRecyclers, recyclers: &VerifyRecyclers,
genesis_config: &GenesisConfig,
) -> result::Result<HashMap<u64, (Arc<Bank>, BankForksInfo)>, BlockstoreProcessorError> { ) -> result::Result<HashMap<u64, (Arc<Bank>, BankForksInfo)>, BlockstoreProcessorError> {
let mut fork_info = HashMap::new(); let mut fork_info = HashMap::new();
let mut last_status_report = Instant::now(); let mut last_status_report = Instant::now();
@ -730,7 +775,16 @@ fn process_pending_slots(
let allocated = thread_mem_usage::Allocatedp::default(); let allocated = thread_mem_usage::Allocatedp::default();
let initial_allocation = allocated.get(); let initial_allocation = allocated.get();
if process_single_slot(blockstore, &bank, &last_entry_hash, opts, recyclers).is_err() { if process_single_slot(
blockstore,
&bank,
&last_entry_hash,
opts,
recyclers,
genesis_config,
)
.is_err()
{
continue; continue;
} }
@ -778,10 +832,19 @@ fn process_single_slot(
last_entry_hash: &Hash, last_entry_hash: &Hash,
opts: &ProcessOptions, opts: &ProcessOptions,
recyclers: &VerifyRecyclers, recyclers: &VerifyRecyclers,
genesis_config: &GenesisConfig,
) -> result::Result<(), BlockstoreProcessorError> { ) -> result::Result<(), BlockstoreProcessorError> {
// Mark corrupt slots as dead so validators don't replay this slot and // Mark corrupt slots as dead so validators don't replay this slot and
// see DuplicateSignature errors later in ReplayStage // see DuplicateSignature errors later in ReplayStage
confirm_full_slot(blockstore, bank, last_entry_hash, opts, recyclers).map_err(|err| { confirm_full_slot(
blockstore,
bank,
last_entry_hash,
opts,
recyclers,
Some(genesis_config),
)
.map_err(|err| {
let slot = bank.slot(); let slot = bank.slot();
blockstore blockstore
.set_dead_slot(slot) .set_dead_slot(slot)
@ -2444,6 +2507,7 @@ pub mod tests {
&bank0.last_blockhash(), &bank0.last_blockhash(),
&opts, &opts,
&recyclers, &recyclers,
None,
) )
.unwrap(); .unwrap();
bank1.squash(); bank1.squash();
@ -2609,7 +2673,7 @@ pub mod tests {
let entry = next_entry(&new_blockhash, 1, vec![tx]); let entry = next_entry(&new_blockhash, 1, vec![tx]);
entries.push(entry); entries.push(entry);
process_entries_with_callback(&bank0, &entries, true, None, None).unwrap(); process_entries_with_callback(&bank0, &entries, true, None, None, None).unwrap();
assert_eq!(bank0.get_balance(&keypair.pubkey()), 1) assert_eq!(bank0.get_balance(&keypair.pubkey()), 1)
} }