diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index fa2a269955..82e5688056 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -5,20 +5,21 @@ use crate::{ commitment::{AggregateCommitmentService, BlockCommitmentCache, CommitmentAggregationData}, consensus::{StakeLockout, Tower}, poh_recorder::PohRecorder, - result::{Error, Result}, + result::Result, rpc_subscriptions::RpcSubscriptions, }; -use solana_ledger::entry::EntryVerificationStatus; use solana_ledger::{ bank_forks::BankForks, - block_error::BlockError, - blockstore::{Blockstore, BlockstoreError}, - blockstore_processor::{self, TransactionStatusSender}, - entry::{Entry, EntrySlice, VerifyRecyclers}, + blockstore::Blockstore, + blockstore_processor::{ + self, BlockstoreProcessorError, ConfirmationProgress, ConfirmationTiming, + TransactionStatusSender, + }, + entry::VerifyRecyclers, leader_schedule_cache::LeaderScheduleCache, snapshot_package::SnapshotPackageSender, }; -use solana_measure::{measure::Measure, thread_mem_usage}; +use solana_measure::thread_mem_usage; use solana_metrics::inc_new_counter_info; use solana_runtime::bank::Bank; use solana_sdk::{ @@ -32,6 +33,7 @@ use solana_sdk::{ use solana_vote_program::vote_instruction; use std::{ collections::{HashMap, HashSet}, + result, sync::{ atomic::{AtomicBool, Ordering}, mpsc::{channel, Receiver, RecvTimeoutError, Sender}, @@ -84,14 +86,18 @@ pub struct ReplayStage { commitment_service: AggregateCommitmentService, } -struct ReplaySlotStats { - // Per-slot elapsed time - slot: Slot, - fetch_entries_elapsed: u64, - fetch_entries_fail_elapsed: u64, - entry_verification_elapsed: u64, - replay_elapsed: u64, - replay_start: Instant, +#[derive(Default)] +pub struct ReplaySlotStats(ConfirmationTiming); +impl std::ops::Deref for ReplaySlotStats { + type Target = ConfirmationTiming; + fn deref(&self) -> &Self::Target { + &self.0 + } +} +impl std::ops::DerefMut for ReplaySlotStats { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } } #[derive(Debug, Clone, Default)] @@ -112,66 +118,43 @@ struct ForkStats { } impl ReplaySlotStats { - pub fn new(slot: Slot) -> Self { - Self { - slot, - fetch_entries_elapsed: 0, - fetch_entries_fail_elapsed: 0, - entry_verification_elapsed: 0, - replay_elapsed: 0, - replay_start: Instant::now(), - } - } - - pub fn report_stats(&self, total_entries: usize, total_shreds: usize) { + pub fn report_stats(&self, slot: Slot, num_entries: usize, num_shreds: u64) { datapoint_info!( "replay-slot-stats", - ("slot", self.slot as i64, i64), - ("fetch_entries_time", self.fetch_entries_elapsed as i64, i64), + ("slot", slot as i64, i64), + ("fetch_entries_time", self.fetch_elapsed as i64, i64), ( "fetch_entries_fail_time", - self.fetch_entries_fail_elapsed as i64, - i64 - ), - ( - "entry_verification_time", - self.entry_verification_elapsed as i64, + self.fetch_fail_elapsed as i64, i64 ), + ("entry_verification_time", self.verify_elapsed as i64, i64), ("replay_time", self.replay_elapsed as i64, i64), ( "replay_total_elapsed", - self.replay_start.elapsed().as_micros() as i64, + self.started.elapsed().as_micros() as i64, i64 ), - ("total_entries", total_entries as i64, i64), - ("total_shreds", total_shreds as i64, i64), + ("total_entries", num_entries as i64, i64), + ("total_shreds", num_shreds as i64, i64), ); } } struct ForkProgress { - last_entry: Hash, - num_shreds: usize, - num_entries: usize, - tick_hash_count: u64, - started_ms: u64, is_dead: bool, - stats: ReplaySlotStats, fork_stats: ForkStats, + replay_stats: ReplaySlotStats, + replay_progress: ConfirmationProgress, } impl ForkProgress { - pub fn new(slot: Slot, last_entry: Hash) -> Self { + pub fn new(last_entry: Hash) -> Self { Self { - last_entry, - num_shreds: 0, - num_entries: 0, - tick_hash_count: 0, - started_ms: timing::timestamp(), is_dead: false, - stats: ReplaySlotStats::new(slot), fork_stats: ForkStats::default(), + replay_stats: ReplaySlotStats::default(), + replay_progress: ConfirmationProgress::new(last_entry), } } } @@ -217,10 +200,7 @@ impl ReplayStage { let mut progress = HashMap::new(); // Initialize progress map with any root banks for bank in bank_forks.read().unwrap().frozen_banks().values() { - progress.insert( - bank.slot(), - ForkProgress::new(bank.slot(), bank.last_blockhash()), - ); + progress.insert(bank.slot(), ForkProgress::new(bank.last_blockhash())); } let mut current_leader = None; let mut last_reset = Hash::default(); @@ -525,83 +505,43 @@ impl ReplayStage { } } - // Returns Some(result) if the `result` is a fatal error, which is an error that will cause a - // bank to be marked as dead/corrupted - fn is_replay_result_fatal(result: &Result<()>) -> bool { - match result { - Err(Error::TransactionError(e)) => { - // Transactions withand transaction errors mean this fork is bogus - let tx_error = Err(e.clone()); - !Bank::can_commit(&tx_error) - } - Err(Error::BlockError(_)) => true, - Err(Error::BlockstoreError(BlockstoreError::InvalidShredData(_))) => true, - Err(Error::BlockstoreError(BlockstoreError::DeadSlot)) => true, - _ => false, - } - } - - // Returns the replay result and the number of replayed transactions fn replay_blockstore_into_bank( bank: &Arc, blockstore: &Blockstore, bank_progress: &mut ForkProgress, transaction_status_sender: Option, verify_recyclers: &VerifyRecyclers, - ) -> (Result<()>, usize) { - let mut tx_count = 0; - let now = Instant::now(); - let load_result = - Self::load_blockstore_entries_with_shred_info(bank, blockstore, bank_progress); - let fetch_entries_elapsed = now.elapsed().as_micros(); - if load_result.is_err() { - bank_progress.stats.fetch_entries_fail_elapsed += fetch_entries_elapsed as u64; - } else { - bank_progress.stats.fetch_entries_elapsed += fetch_entries_elapsed as u64; - } + ) -> result::Result { + let tx_count_before = bank_progress.replay_progress.num_txs; + let confirm_result = blockstore_processor::confirm_slot( + blockstore, + bank, + &mut bank_progress.replay_stats, + &mut bank_progress.replay_progress, + false, + transaction_status_sender, + None, + verify_recyclers, + ); + let tx_count_after = bank_progress.replay_progress.num_txs; + let tx_count = tx_count_after - tx_count_before; - let replay_result = load_result.and_then(|(entries, num_shreds, slot_full)| { - trace!( - "Fetch entries for slot {}, {:?} entries, num shreds {}, slot_full: {}", - bank.slot(), - entries.len(), - num_shreds, - slot_full, - ); - tx_count += entries.iter().map(|e| e.transactions.len()).sum::(); - Self::replay_entries_into_bank( - bank, - bank_progress, - entries, - num_shreds, - slot_full, - transaction_status_sender, - verify_recyclers, - ) - }); - - if Self::is_replay_result_fatal(&replay_result) { - warn!( - "Fatal replay result in slot: {}, result: {:?}", - bank.slot(), - replay_result - ); + confirm_result.map_err(|err| { + let slot = bank.slot(); + warn!("Fatal replay error in slot: {}, err: {:?}", slot, err); datapoint_error!( "replay-stage-mark_dead_slot", - ("error", format!("error: {:?}", replay_result), String), - ("slot", bank.slot(), i64) + ("error", format!("error: {:?}", err), String), + ("slot", slot, i64) ); - Self::mark_dead_slot(bank.slot(), blockstore, bank_progress); - } + bank_progress.is_dead = true; + blockstore + .set_dead_slot(slot) + .expect("Failed to mark slot as dead in blockstore"); + err + })?; - (replay_result, tx_count) - } - - fn mark_dead_slot(slot: Slot, blockstore: &Blockstore, bank_progress: &mut ForkProgress) { - bank_progress.is_dead = true; - blockstore - .set_dead_slot(slot) - .expect("Failed to mark slot as dead in blockstore"); + Ok(tx_count) } #[allow(clippy::too_many_arguments)] @@ -758,30 +698,32 @@ impl ReplayStage { // this bank in `select_fork()` let bank_progress = &mut progress .entry(bank.slot()) - .or_insert_with(|| ForkProgress::new(bank.slot(), bank.last_blockhash())); + .or_insert_with(|| ForkProgress::new(bank.last_blockhash())); if bank.collector_id() != my_pubkey { - let (replay_result, replay_tx_count) = Self::replay_blockstore_into_bank( + let replay_result = Self::replay_blockstore_into_bank( &bank, &blockstore, bank_progress, transaction_status_sender.clone(), verify_recyclers, ); - tx_count += replay_tx_count; - if Self::is_replay_result_fatal(&replay_result) { - trace!("replay_result_fatal slot {}", bank_slot); - // If the bank was corrupted, don't try to run the below logic to check if the - // bank is completed - continue; + match replay_result { + Ok(replay_tx_count) => tx_count += replay_tx_count, + Err(err) => { + trace!("replay_result err: {:?}, slot {}", err, bank_slot); + // If the bank was corrupted, don't try to run the below logic to check if the + // bank is completed + continue; + } } } assert_eq!(*bank_slot, bank.slot()); - if bank.tick_height() == bank.max_tick_height() { - if let Some(bank_progress) = &mut progress.get(&bank.slot()) { - bank_progress - .stats - .report_stats(bank_progress.num_entries, bank_progress.num_shreds); - } + if bank.is_complete() { + bank_progress.replay_stats.report_stats( + bank.slot(), + bank_progress.replay_progress.num_entries, + bank_progress.replay_progress.num_shreds, + ); did_complete_bank = true; Self::process_completed_bank(my_pubkey, bank, slot_full_senders); } else { @@ -939,7 +881,7 @@ impl ReplayStage { ) { for (slot, prog) in progress.iter_mut() { if !prog.fork_stats.confirmation_reported { - let duration = timing::timestamp() - prog.started_ms; + let duration = prog.replay_stats.started.elapsed().as_millis(); if tower.is_slot_confirmed(*slot, stake_lockouts, total_staked) && bank_forks .read() @@ -963,141 +905,6 @@ impl ReplayStage { } } - fn load_blockstore_entries_with_shred_info( - bank: &Bank, - blockstore: &Blockstore, - bank_progress: &mut ForkProgress, - ) -> Result<(Vec, usize, bool)> { - blockstore - .get_slot_entries_with_shred_info(bank.slot(), bank_progress.num_shreds as u64) - .map_err(|err| err.into()) - } - - fn replay_entries_into_bank( - bank: &Arc, - bank_progress: &mut ForkProgress, - entries: Vec, - num_shreds: usize, - slot_full: bool, - transaction_status_sender: Option, - verify_recyclers: &VerifyRecyclers, - ) -> Result<()> { - let result = Self::verify_and_process_entries( - &bank, - &entries, - slot_full, - bank_progress.num_shreds, - bank_progress, - transaction_status_sender, - verify_recyclers, - ); - bank_progress.num_shreds += num_shreds; - bank_progress.num_entries += entries.len(); - if let Some(last_entry) = entries.last() { - bank_progress.last_entry = last_entry.hash; - } - - result - } - - fn verify_ticks( - bank: &Arc, - entries: &[Entry], - slot_full: bool, - tick_hash_count: &mut u64, - ) -> std::result::Result<(), BlockError> { - let next_bank_tick_height = bank.tick_height() + entries.tick_count(); - let max_bank_tick_height = bank.max_tick_height(); - if next_bank_tick_height > max_bank_tick_height { - return Err(BlockError::InvalidTickCount); - } - - if next_bank_tick_height < max_bank_tick_height && slot_full { - return Err(BlockError::InvalidTickCount); - } - - if next_bank_tick_height == max_bank_tick_height { - let has_trailing_entry = !entries.last().unwrap().is_tick(); - if has_trailing_entry { - return Err(BlockError::TrailingEntry); - } - - if !slot_full { - return Err(BlockError::InvalidLastTick); - } - } - - let hashes_per_tick = bank.hashes_per_tick().unwrap_or(0); - if !entries.verify_tick_hash_count(tick_hash_count, hashes_per_tick) { - return Err(BlockError::InvalidTickHashCount); - } - - Ok(()) - } - - fn verify_and_process_entries( - bank: &Arc, - entries: &[Entry], - slot_full: bool, - shred_index: usize, - bank_progress: &mut ForkProgress, - transaction_status_sender: Option, - recyclers: &VerifyRecyclers, - ) -> Result<()> { - let last_entry = &bank_progress.last_entry; - let tick_hash_count = &mut bank_progress.tick_hash_count; - let handle_block_error = move |block_error: BlockError| -> Result<()> { - warn!( - "{:#?}, slot: {}, entry len: {}, tick_height: {}, last entry: {}, last_blockhash: {}, shred_index: {}, slot_full: {}", - block_error, - bank.slot(), - entries.len(), - bank.tick_height(), - last_entry, - bank.last_blockhash(), - shred_index, - slot_full, - ); - - datapoint_error!( - "replay-stage-block-error", - ("slot", bank.slot(), i64), - ("last_entry", last_entry.to_string(), String), - ); - - Err(Error::BlockError(block_error)) - }; - - if let Err(block_error) = Self::verify_ticks(bank, entries, slot_full, tick_hash_count) { - return handle_block_error(block_error); - } - - datapoint_debug!("verify-batch-size", ("size", entries.len() as i64, i64)); - let mut verify_total = Measure::start("verify_and_process_entries"); - let mut entry_state = entries.start_verify(last_entry, recyclers.clone()); - - if entry_state.status() == EntryVerificationStatus::Failure { - return handle_block_error(BlockError::InvalidEntryHash); - } - - let mut replay_elapsed = Measure::start("replay_elapsed"); - let res = - blockstore_processor::process_entries(bank, entries, true, transaction_status_sender); - replay_elapsed.stop(); - bank_progress.stats.replay_elapsed += replay_elapsed.as_us(); - - if !entry_state.finish_verify(entries) { - return handle_block_error(BlockError::InvalidEntryHash); - } - - verify_total.stop(); - bank_progress.stats.entry_verification_elapsed = - verify_total.as_us() - replay_elapsed.as_us(); - - res?; - Ok(()) - } - fn handle_new_root( bank_forks: &Arc>, progress: &mut HashMap, @@ -1192,10 +999,11 @@ pub(crate) mod tests { use crossbeam_channel::unbounded; use solana_client::rpc_request::RpcEncodedTransaction; use solana_ledger::{ + block_error::BlockError, blockstore::make_slot_entries, blockstore::{entries_to_test_shreds, BlockstoreError}, create_new_tmp_ledger, - entry::{self, next_entry}, + entry::{self, next_entry, Entry}, get_tmp_ledger_path, shred::{ CodingShredHeader, DataShredHeader, Shred, ShredCommonHeader, DATA_COMPLETE_SHRED, @@ -1328,12 +1136,7 @@ pub(crate) mod tests { for fork_progress in fork_progresses.iter_mut() { fork_progress .entry(neutral_fork.fork[0]) - .or_insert_with(|| { - ForkProgress::new( - bank_forks.banks[&0].slot(), - bank_forks.banks[&0].last_blockhash(), - ) - }); + .or_insert_with(|| ForkProgress::new(bank_forks.banks[&0].last_blockhash())); } for index in 1..neutral_fork.fork.len() { @@ -1360,7 +1163,6 @@ pub(crate) mod tests { .entry(bank_forks.banks[&neutral_fork.fork[index]].slot()) .or_insert_with(|| { ForkProgress::new( - bank_forks.banks[&neutral_fork.fork[index]].slot(), bank_forks.banks[&neutral_fork.fork[index]].last_blockhash(), ) }); @@ -1404,7 +1206,6 @@ pub(crate) mod tests { .entry(bank_forks.banks[&fork_info.fork[index]].slot()) .or_insert_with(|| { ForkProgress::new( - bank_forks.banks[&fork_info.fork[index]].slot(), bank_forks.banks[&fork_info.fork[index]].last_blockhash(), ) }); @@ -1551,7 +1352,7 @@ pub(crate) mod tests { let bank0 = Bank::new(&genesis_config); let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank0))); let mut progress = HashMap::new(); - progress.insert(5, ForkProgress::new(0, Hash::default())); + progress.insert(5, ForkProgress::new(Hash::default())); ReplayStage::handle_new_root(&bank_forks, &mut progress); assert!(progress.is_empty()); } @@ -1585,7 +1386,9 @@ pub(crate) mod tests { assert_matches!( res, - Err(Error::TransactionError(TransactionError::AccountNotFound)) + Err(BlockstoreProcessorError::InvalidTransaction( + TransactionError::AccountNotFound + )) ); } @@ -1611,7 +1414,7 @@ pub(crate) mod tests { entries_to_test_shreds(vec![entry], slot, slot.saturating_sub(1), false, 0) }); - if let Err(Error::BlockError(block_error)) = res { + if let Err(BlockstoreProcessorError::InvalidBlock(block_error)) = res { assert_eq!(block_error, BlockError::InvalidEntryHash); } else { assert!(false); @@ -1636,7 +1439,7 @@ pub(crate) mod tests { ) }); - if let Err(Error::BlockError(block_error)) = res { + if let Err(BlockstoreProcessorError::InvalidBlock(block_error)) = res { assert_eq!(block_error, BlockError::InvalidTickHashCount); } else { assert!(false); @@ -1659,7 +1462,7 @@ pub(crate) mod tests { ) }); - if let Err(Error::BlockError(block_error)) = res { + if let Err(BlockstoreProcessorError::InvalidBlock(block_error)) = res { assert_eq!(block_error, BlockError::InvalidTickCount); } else { assert!(false); @@ -1679,7 +1482,7 @@ pub(crate) mod tests { ) }); - if let Err(Error::BlockError(block_error)) = res { + if let Err(BlockstoreProcessorError::InvalidBlock(block_error)) = res { assert_eq!(block_error, BlockError::InvalidTickCount); } else { assert!(false); @@ -1701,7 +1504,7 @@ pub(crate) mod tests { ) }); - if let Err(Error::BlockError(block_error)) = res { + if let Err(BlockstoreProcessorError::InvalidBlock(block_error)) = res { assert_eq!(block_error, BlockError::InvalidLastTick); } else { assert!(false); @@ -1725,7 +1528,7 @@ pub(crate) mod tests { entries_to_test_shreds(entries, slot, slot.saturating_sub(1), true, 0) }); - if let Err(Error::BlockError(block_error)) = res { + if let Err(BlockstoreProcessorError::InvalidBlock(block_error)) = res { assert_eq!(block_error, BlockError::TrailingEntry); } else { assert!(false); @@ -1755,13 +1558,15 @@ pub(crate) mod tests { assert_matches!( res, - Err(Error::BlockstoreError(BlockstoreError::InvalidShredData(_))) + Err( + BlockstoreProcessorError::FailedToLoadEntries(BlockstoreError::InvalidShredData(_)), + ) ); } // Given a shred and a fatal expected error, check that replaying that shred causes causes the fork to be // marked as dead. Returns the error for caller to verify. - fn check_dead_fork(shred_to_insert: F) -> Result<()> + fn check_dead_fork(shred_to_insert: F) -> result::Result<(), BlockstoreProcessorError> where F: Fn(&Keypair, Arc) -> Vec, { @@ -1782,10 +1587,10 @@ pub(crate) mod tests { let last_blockhash = bank0.last_blockhash(); let mut bank0_progress = progress .entry(bank0.slot()) - .or_insert_with(|| ForkProgress::new(0, last_blockhash)); + .or_insert_with(|| ForkProgress::new(last_blockhash)); let shreds = shred_to_insert(&mint_keypair, bank0.clone()); blockstore.insert_shreds(shreds, None, false).unwrap(); - let (res, _tx_count) = ReplayStage::replay_blockstore_into_bank( + let res = ReplayStage::replay_blockstore_into_bank( &bank0, &blockstore, &mut bank0_progress, @@ -1801,7 +1606,7 @@ pub(crate) mod tests { // Check that the erroring bank was marked as dead in blockstore assert!(blockstore.is_dead(bank0.slot())); - res + res.map(|_| ()) }; let _ignored = remove_dir_all(&ledger_path); res diff --git a/ledger/src/bank_forks_utils.rs b/ledger/src/bank_forks_utils.rs index 89abac68fe..3ee7aab8a7 100644 --- a/ledger/src/bank_forks_utils.rs +++ b/ledger/src/bank_forks_utils.rs @@ -2,6 +2,7 @@ use crate::{ bank_forks::{BankForks, SnapshotConfig}, blockstore::Blockstore, blockstore_processor::{self, BankForksInfo, BlockstoreProcessorError, ProcessOptions}, + entry::VerifyRecyclers, leader_schedule_cache::LeaderScheduleCache, snapshot_utils, }; @@ -47,6 +48,7 @@ pub fn load( blockstore, Arc::new(deserialized_bank), &process_options, + &VerifyRecyclers::default(), ); } else { info!("Snapshot package does not exist: {:?}", tar); diff --git a/ledger/src/block_error.rs b/ledger/src/block_error.rs index 91e5243c2e..89505ab3fa 100644 --- a/ledger/src/block_error.rs +++ b/ledger/src/block_error.rs @@ -2,6 +2,10 @@ use thiserror::Error; #[derive(Error, Debug, PartialEq)] pub enum BlockError { + /// Block did not have enough ticks or was not marked full + #[error("incomplete block")] + Incomplete, + /// Block entries hashes must all be valid #[error("invalid entry hash")] InvalidEntryHash, diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index ada0908a36..e5cc53d38f 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -1474,7 +1474,7 @@ impl Blockstore { &self, slot: Slot, start_index: u64, - ) -> Result<(Vec, usize, bool)> { + ) -> Result<(Vec, u64, bool)> { if self.is_dead(slot) { return Err(BlockstoreError::DeadSlot); } @@ -1497,7 +1497,7 @@ impl Blockstore { let num_shreds = completed_ranges .last() .map(|(_, end_index)| u64::from(*end_index) - start_index + 1) - .unwrap_or(0) as usize; + .unwrap_or(0); let entries: Result>> = PAR_THREAD_POOL.with(|thread_pool| { thread_pool.borrow().install(|| { diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 028b9b2708..ad6cab09d6 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -2,8 +2,9 @@ use crate::{ bank_forks::BankForks, block_error::BlockError, blockstore::Blockstore, + blockstore_db::BlockstoreError, blockstore_meta::SlotMeta, - entry::{create_ticks, Entry, EntrySlice}, + entry::{create_ticks, Entry, EntrySlice, EntryVerificationStatus, VerifyRecyclers}, leader_schedule_cache::LeaderScheduleCache, }; use crossbeam_channel::Sender; @@ -11,7 +12,7 @@ use itertools::Itertools; use log::*; use rand::{seq::SliceRandom, thread_rng}; use rayon::{prelude::*, ThreadPool}; -use solana_measure::thread_mem_usage; +use solana_measure::{measure::Measure, thread_mem_usage}; use solana_metrics::{datapoint, datapoint_error, inc_new_counter_debug}; use solana_rayon_threadlimit::get_thread_count; use solana_runtime::{ @@ -24,7 +25,7 @@ use solana_sdk::{ hash::Hash, signature::{Keypair, KeypairUtil}, timing::duration_as_ms, - transaction::{Result, Transaction}, + transaction::{Result, Transaction, TransactionError}, }; use std::{ cell::RefCell, @@ -234,10 +235,10 @@ pub struct BankForksInfo { pub bank_slot: u64, } -#[derive(Error, Debug, PartialEq)] +#[derive(Error, Debug)] pub enum BlockstoreProcessorError { #[error("failed to load entries")] - FailedToLoadEntries, + FailedToLoadEntries(#[from] BlockstoreError), #[error("failed to load meta")] FailedToLoadMeta, @@ -246,7 +247,7 @@ pub enum BlockstoreProcessorError { InvalidBlock(#[from] BlockError), #[error("invalid transaction")] - InvalidTransaction, + InvalidTransaction(#[from] TransactionError), #[error("no valid forks found")] NoValidForksFound, @@ -283,8 +284,9 @@ pub fn process_blockstore( // Setup bank for slot 0 let bank0 = Arc::new(Bank::new_with_paths(&genesis_config, account_paths)); info!("processing ledger for slot 0..."); - process_bank_0(&bank0, blockstore, &opts)?; - process_blockstore_from_root(genesis_config, blockstore, bank0, &opts) + let recyclers = VerifyRecyclers::default(); + process_bank_0(&bank0, blockstore, &opts, &recyclers)?; + process_blockstore_from_root(genesis_config, blockstore, bank0, &opts, &recyclers) } // Process blockstore from a known root bank @@ -293,6 +295,7 @@ pub fn process_blockstore_from_root( blockstore: &Blockstore, bank: Arc, opts: &ProcessOptions, + recyclers: &VerifyRecyclers, ) -> result::Result<(BankForks, Vec, LeaderScheduleCache), BlockstoreProcessorError> { info!("processing ledger from root slot {}...", bank.slot()); @@ -330,6 +333,7 @@ pub fn process_blockstore_from_root( &mut leader_schedule_cache, &mut rooted_path, opts, + recyclers, )?; let (banks, bank_forks_info): (Vec<_>, Vec<_>) = fork_info.into_iter().map(|(_, v)| v).unzip(); @@ -366,55 +370,215 @@ pub fn process_blockstore_from_root( Ok((bank_forks, bank_forks_info, leader_schedule_cache)) } -fn verify_and_process_slot_entries( +/// Verify that a segment of entries has the correct number of ticks and hashes +pub fn verify_ticks( bank: &Arc, entries: &[Entry], - last_entry_hash: Hash, - opts: &ProcessOptions, -) -> result::Result { - assert!(!entries.is_empty()); + slot_full: bool, + tick_hash_count: &mut u64, +) -> std::result::Result<(), BlockError> { + let next_bank_tick_height = bank.tick_height() + entries.tick_count(); + let max_bank_tick_height = bank.max_tick_height(); + if next_bank_tick_height > max_bank_tick_height { + warn!("Too many entry ticks found in slot: {}", bank.slot()); + return Err(BlockError::InvalidTickCount); + } - if opts.poh_verify { - let next_bank_tick_height = bank.tick_height() + entries.tick_count(); - let max_bank_tick_height = bank.max_tick_height(); - if next_bank_tick_height != max_bank_tick_height { - warn!( - "Invalid number of entry ticks found in slot: {}", - bank.slot() - ); - return Err(BlockError::InvalidTickCount.into()); - } else if !entries.last().unwrap().is_tick() { + if next_bank_tick_height < max_bank_tick_height && slot_full { + warn!("Too few entry ticks found in slot: {}", bank.slot()); + return Err(BlockError::InvalidTickCount); + } + + if next_bank_tick_height == max_bank_tick_height { + let has_trailing_entry = entries.last().map(|e| !e.is_tick()).unwrap_or_default(); + if has_trailing_entry { warn!("Slot: {} did not end with a tick entry", bank.slot()); - return Err(BlockError::TrailingEntry.into()); + return Err(BlockError::TrailingEntry); } - if let Some(hashes_per_tick) = bank.hashes_per_tick() { - if !entries.verify_tick_hash_count(&mut 0, *hashes_per_tick) { - warn!( - "Tick with invalid number of hashes found in slot: {}", - bank.slot() - ); - return Err(BlockError::InvalidTickHashCount.into()); - } - } - - if !entries.verify(&last_entry_hash) { - warn!("Ledger proof of history failed at slot: {}", bank.slot()); - return Err(BlockError::InvalidEntryHash.into()); + if !slot_full { + warn!("Slot: {} was not marked full", bank.slot()); + return Err(BlockError::InvalidLastTick); } } - process_entries_with_callback(bank, &entries, true, opts.entry_callback.as_ref(), None) - .map_err(|err| { - warn!( - "Failed to process entries for slot {}: {:?}", - bank.slot(), - err - ); - BlockstoreProcessorError::InvalidTransaction - })?; + let hashes_per_tick = bank.hashes_per_tick().unwrap_or(0); + if !entries.verify_tick_hash_count(tick_hash_count, hashes_per_tick) { + warn!( + "Tick with invalid number of hashes found in slot: {}", + bank.slot() + ); + return Err(BlockError::InvalidTickHashCount); + } - Ok(entries.last().unwrap().hash) + Ok(()) +} + +fn confirm_full_slot( + blockstore: &Blockstore, + bank: &Arc, + last_entry_hash: &Hash, + opts: &ProcessOptions, + recyclers: &VerifyRecyclers, +) -> result::Result<(), BlockstoreProcessorError> { + let mut timing = ConfirmationTiming::default(); + let mut progress = ConfirmationProgress::new(*last_entry_hash); + let skip_verification = !opts.poh_verify; + confirm_slot( + blockstore, + bank, + &mut timing, + &mut progress, + skip_verification, + None, + opts.entry_callback.as_ref(), + recyclers, + )?; + + if !bank.is_complete() { + Err(BlockstoreProcessorError::InvalidBlock( + BlockError::Incomplete, + )) + } else { + Ok(()) + } +} + +pub struct ConfirmationTiming { + pub started: Instant, + pub replay_elapsed: u64, + pub verify_elapsed: u64, + pub fetch_elapsed: u64, + pub fetch_fail_elapsed: u64, +} + +impl Default for ConfirmationTiming { + fn default() -> Self { + Self { + started: Instant::now(), + replay_elapsed: 0, + verify_elapsed: 0, + fetch_elapsed: 0, + fetch_fail_elapsed: 0, + } + } +} + +#[derive(Default)] +pub struct ConfirmationProgress { + pub last_entry: Hash, + pub tick_hash_count: u64, + pub num_shreds: u64, + pub num_entries: usize, + pub num_txs: usize, +} + +impl ConfirmationProgress { + pub fn new(last_entry: Hash) -> Self { + Self { + last_entry, + ..Self::default() + } + } +} + +pub fn confirm_slot( + blockstore: &Blockstore, + bank: &Arc, + timing: &mut ConfirmationTiming, + progress: &mut ConfirmationProgress, + skip_verification: bool, + transaction_status_sender: Option, + entry_callback: Option<&ProcessCallback>, + recyclers: &VerifyRecyclers, +) -> result::Result<(), BlockstoreProcessorError> { + let slot = bank.slot(); + + let (entries, num_shreds, slot_full) = { + let mut load_elapsed = Measure::start("load_elapsed"); + let load_result = blockstore + .get_slot_entries_with_shred_info(slot, progress.num_shreds) + .map_err(BlockstoreProcessorError::FailedToLoadEntries); + load_elapsed.stop(); + if load_result.is_err() { + timing.fetch_fail_elapsed += load_elapsed.as_us(); + } else { + timing.fetch_elapsed += load_elapsed.as_us(); + } + load_result + }?; + + let num_entries = entries.len(); + let num_txs = entries.iter().map(|e| e.transactions.len()).sum::(); + trace!( + "Fetched entries for slot {}, num_entries: {}, num_shreds: {}, num_txs: {}, slot_full: {}", + slot, + num_entries, + num_shreds, + num_txs, + slot_full, + ); + + if !skip_verification { + let tick_hash_count = &mut progress.tick_hash_count; + verify_ticks(bank, &entries, slot_full, tick_hash_count).map_err(|err| { + warn!( + "{:#?}, slot: {}, entry len: {}, tick_height: {}, last entry: {}, last_blockhash: {}, shred_index: {}, slot_full: {}", + err, + slot, + num_entries, + bank.tick_height(), + progress.last_entry, + bank.last_blockhash(), + num_shreds, + slot_full, + ); + err + })?; + } + + let verifier = if !skip_verification { + datapoint_debug!("verify-batch-size", ("size", num_entries as i64, i64)); + let entry_state = entries.start_verify(&progress.last_entry, recyclers.clone()); + if entry_state.status() == EntryVerificationStatus::Failure { + warn!("Ledger proof of history failed at slot: {}", slot); + return Err(BlockError::InvalidEntryHash.into()); + } + Some(entry_state) + } else { + None + }; + + let mut replay_elapsed = Measure::start("replay_elapsed"); + let process_result = process_entries_with_callback( + bank, + &entries, + true, + entry_callback, + transaction_status_sender, + ) + .map_err(BlockstoreProcessorError::from); + replay_elapsed.stop(); + timing.replay_elapsed += replay_elapsed.as_us(); + + if let Some(mut verifier) = verifier { + if !verifier.finish_verify(&entries) { + warn!("Ledger proof of history failed at slot: {}", bank.slot()); + return Err(BlockError::InvalidEntryHash.into()); + } + timing.verify_elapsed += verifier.duration_ms(); + } + + process_result?; + + progress.num_shreds += num_shreds; + progress.num_entries += num_entries; + progress.num_txs += num_txs; + if let Some(last_entry) = entries.last() { + progress.last_entry = last_entry.hash; + } + + Ok(()) } // Special handling required for processing the entries in slot 0 @@ -422,20 +586,12 @@ fn process_bank_0( bank0: &Arc, blockstore: &Blockstore, opts: &ProcessOptions, + recyclers: &VerifyRecyclers, ) -> result::Result<(), BlockstoreProcessorError> { assert_eq!(bank0.slot(), 0); - - // Fetch all entries for this slot - let entries = blockstore.get_slot_entries(0, 0, None).map_err(|err| { - warn!("Failed to load entries for slot 0, err: {:?}", err); - BlockstoreProcessorError::FailedToLoadEntries - })?; - - verify_and_process_slot_entries(bank0, &entries, bank0.last_blockhash(), opts) + confirm_full_slot(blockstore, bank0, &bank0.last_blockhash(), opts, recyclers) .expect("processing for bank 0 must succceed"); - bank0.freeze(); - Ok(()) } @@ -508,6 +664,7 @@ fn process_pending_slots( leader_schedule_cache: &mut LeaderScheduleCache, rooted_path: &mut Vec, opts: &ProcessOptions, + recyclers: &VerifyRecyclers, ) -> result::Result, BankForksInfo)>, BlockstoreProcessorError> { let mut fork_info = HashMap::new(); let mut last_status_report = Instant::now(); @@ -537,7 +694,7 @@ fn process_pending_slots( let allocated = thread_mem_usage::Allocatedp::default(); let initial_allocation = allocated.get(); - if process_single_slot(blockstore, &bank, &last_entry_hash, opts).is_err() { + if process_single_slot(blockstore, &bank, &last_entry_hash, opts, recyclers).is_err() { continue; } @@ -584,19 +741,15 @@ fn process_single_slot( bank: &Arc, last_entry_hash: &Hash, opts: &ProcessOptions, + recyclers: &VerifyRecyclers, ) -> result::Result<(), BlockstoreProcessorError> { - let slot = bank.slot(); - - // Fetch all entries for this slot - let entries = blockstore.get_slot_entries(slot, 0, None).map_err(|err| { - warn!("Failed to load entries for slot {}: {:?}", slot, err); - BlockstoreProcessorError::FailedToLoadEntries - })?; - - // If this errors with a fatal error, should mark the slot as dead so - // validators don't replay this slot and see DuplicateSignature errors - // later in ReplayStage - verify_and_process_slot_entries(&bank, &entries, *last_entry_hash, opts).map_err(|err| { + // Mark corrupt slots as dead so validators don't replay this slot and + // see DuplicateSignature errors later in ReplayStage + confirm_full_slot(blockstore, bank, last_entry_hash, opts, recyclers).map_err(|err| { + let slot = bank.slot(); + blockstore + .set_dead_slot(slot) + .expect("Failed to mark slot as dead in blockstore"); warn!("slot {} failed to verify: {}", slot, err); err })?; @@ -918,7 +1071,7 @@ pub mod tests { } ); - /* Add a complete slot such that the tree looks like: + /* Add a complete slot such that the store looks like: slot 0 (all ticks) / \ @@ -2246,16 +2399,23 @@ pub mod tests { poh_verify: true, ..ProcessOptions::default() }; - process_bank_0(&bank0, &blockstore, &opts).unwrap(); + let recyclers = VerifyRecyclers::default(); + process_bank_0(&bank0, &blockstore, &opts, &recyclers).unwrap(); let bank1 = Arc::new(Bank::new_from_parent(&bank0, &Pubkey::default(), 1)); - let slot1_entries = blockstore.get_slot_entries(1, 0, None).unwrap(); - verify_and_process_slot_entries(&bank1, &slot1_entries, bank0.last_blockhash(), &opts) - .unwrap(); + confirm_full_slot( + &blockstore, + &bank1, + &bank0.last_blockhash(), + &opts, + &recyclers, + ) + .unwrap(); bank1.squash(); // Test process_blockstore_from_root() from slot 1 onwards let (bank_forks, bank_forks_info, _) = - process_blockstore_from_root(&genesis_config, &blockstore, bank1, &opts).unwrap(); + process_blockstore_from_root(&genesis_config, &blockstore, bank1, &opts, &recyclers) + .unwrap(); assert_eq!(bank_forks_info.len(), 1); // One fork assert_eq!( diff --git a/ledger/src/entry.rs b/ledger/src/entry.rs index a91d16bb09..bd03265c2b 100644 --- a/ledger/src/entry.rs +++ b/ledger/src/entry.rs @@ -155,7 +155,7 @@ pub struct VerificationData { verification_status: EntryVerificationStatus, hashes: Option>>>, tx_hashes: Vec>, - start_time_ms: u64, + duration_ms: u64, } #[derive(Default, Clone)] @@ -184,6 +184,13 @@ impl EntryVerificationState { } } + pub fn duration_ms(&self) -> u64 { + match self { + EntryVerificationState::CPU(state) => state.duration_ms, + EntryVerificationState::GPU(state) => state.duration_ms, + } + } + pub fn finish_verify(&mut self, entries: &[Entry]) -> bool { match self { EntryVerificationState::GPU(verification_state) => { @@ -217,10 +224,10 @@ impl EntryVerificationState { }); verify_check_time.stop(); + verification_state.duration_ms += gpu_time_ms + verify_check_time.as_ms(); inc_new_counter_warn!( "entry_verify-duration", - (gpu_time_ms + verify_check_time.as_ms() + verification_state.start_time_ms) - as usize + verification_state.duration_ms as usize ); verification_state.verification_status = if res { @@ -281,10 +288,8 @@ impl EntrySlice for [Entry] { }) }) }); - inc_new_counter_warn!( - "entry_verify-duration", - timing::duration_as_ms(&now.elapsed()) as usize - ); + let duration_ms = timing::duration_as_ms(&now.elapsed()); + inc_new_counter_warn!("entry_verify-duration", duration_ms as usize); EntryVerificationState::CPU(VerificationData { thread_h: None, verification_status: if res { @@ -294,7 +299,7 @@ impl EntrySlice for [Entry] { }, hashes: None, tx_hashes: vec![], - start_time_ms: 0, + duration_ms, }) } @@ -382,7 +387,7 @@ impl EntrySlice for [Entry] { thread_h: Some(gpu_verify_thread), verification_status: EntryVerificationStatus::Pending, tx_hashes, - start_time_ms: timing::duration_as_ms(&start.elapsed()), + duration_ms: timing::duration_as_ms(&start.elapsed()), hashes: Some(hashes), }) } diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index a5c0d23cb8..2634012e34 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -902,6 +902,10 @@ impl Bank { } } + pub fn is_complete(&self) -> bool { + self.tick_height() == self.max_tick_height() + } + pub fn is_block_boundary(&self, tick_height: u64) -> bool { tick_height % self.ticks_per_slot == 0 } diff --git a/runtime/tests/bank.rs b/runtime/tests/bank.rs index 7f4b175988..c743c6b660 100644 --- a/runtime/tests/bank.rs +++ b/runtime/tests/bank.rs @@ -20,7 +20,7 @@ fn test_race_register_tick_freeze() { let freeze_thread = Builder::new() .name("freeze".to_string()) .spawn(move || loop { - if bank0_.tick_height() == bank0_.max_tick_height() { + if bank0_.is_complete() { assert_eq!(bank0_.last_blockhash(), hash); break; } diff --git a/sdk/src/transaction.rs b/sdk/src/transaction.rs index ad8974d618..6da812d73f 100644 --- a/sdk/src/transaction.rs +++ b/sdk/src/transaction.rs @@ -9,9 +9,10 @@ use crate::signature::{KeypairUtil, Signature}; use crate::system_instruction; use bincode::serialize; use std::result; +use thiserror::Error; /// Reasons a transaction might be rejected. -#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] +#[derive(Error, Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] pub enum TransactionError { /// This Pubkey is being processed in another transaction AccountInUse, @@ -59,6 +60,12 @@ pub enum TransactionError { pub type Result = result::Result; +impl std::fmt::Display for TransactionError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "transaction error") + } +} + /// An atomic transaction #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] pub struct Transaction {