Consolidate entry tick verification into one function (#7740)

* Consolidate entry tick verification into one function

* Mark bad slots as dead in blocktree processor

* more feedback

* Add bank.is_complete

* feedback
This commit is contained in:
Justin Starry
2020-01-15 09:15:26 +08:00
committed by GitHub
parent 721c4378c1
commit ff1ca1e0d3
9 changed files with 369 additions and 382 deletions

View File

@ -5,20 +5,21 @@ use crate::{
commitment::{AggregateCommitmentService, BlockCommitmentCache, CommitmentAggregationData},
consensus::{StakeLockout, Tower},
poh_recorder::PohRecorder,
result::{Error, Result},
result::Result,
rpc_subscriptions::RpcSubscriptions,
};
use solana_ledger::entry::EntryVerificationStatus;
use solana_ledger::{
bank_forks::BankForks,
block_error::BlockError,
blockstore::{Blockstore, BlockstoreError},
blockstore_processor::{self, TransactionStatusSender},
entry::{Entry, EntrySlice, VerifyRecyclers},
blockstore::Blockstore,
blockstore_processor::{
self, BlockstoreProcessorError, ConfirmationProgress, ConfirmationTiming,
TransactionStatusSender,
},
entry::VerifyRecyclers,
leader_schedule_cache::LeaderScheduleCache,
snapshot_package::SnapshotPackageSender,
};
use solana_measure::{measure::Measure, thread_mem_usage};
use solana_measure::thread_mem_usage;
use solana_metrics::inc_new_counter_info;
use solana_runtime::bank::Bank;
use solana_sdk::{
@ -32,6 +33,7 @@ use solana_sdk::{
use solana_vote_program::vote_instruction;
use std::{
collections::{HashMap, HashSet},
result,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{channel, Receiver, RecvTimeoutError, Sender},
@ -84,14 +86,18 @@ pub struct ReplayStage {
commitment_service: AggregateCommitmentService,
}
struct ReplaySlotStats {
// Per-slot elapsed time
slot: Slot,
fetch_entries_elapsed: u64,
fetch_entries_fail_elapsed: u64,
entry_verification_elapsed: u64,
replay_elapsed: u64,
replay_start: Instant,
#[derive(Default)]
pub struct ReplaySlotStats(ConfirmationTiming);
impl std::ops::Deref for ReplaySlotStats {
type Target = ConfirmationTiming;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl std::ops::DerefMut for ReplaySlotStats {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
#[derive(Debug, Clone, Default)]
@ -112,66 +118,43 @@ struct ForkStats {
}
impl ReplaySlotStats {
pub fn new(slot: Slot) -> Self {
Self {
slot,
fetch_entries_elapsed: 0,
fetch_entries_fail_elapsed: 0,
entry_verification_elapsed: 0,
replay_elapsed: 0,
replay_start: Instant::now(),
}
}
pub fn report_stats(&self, total_entries: usize, total_shreds: usize) {
pub fn report_stats(&self, slot: Slot, num_entries: usize, num_shreds: u64) {
datapoint_info!(
"replay-slot-stats",
("slot", self.slot as i64, i64),
("fetch_entries_time", self.fetch_entries_elapsed as i64, i64),
("slot", slot as i64, i64),
("fetch_entries_time", self.fetch_elapsed as i64, i64),
(
"fetch_entries_fail_time",
self.fetch_entries_fail_elapsed as i64,
i64
),
(
"entry_verification_time",
self.entry_verification_elapsed as i64,
self.fetch_fail_elapsed as i64,
i64
),
("entry_verification_time", self.verify_elapsed as i64, i64),
("replay_time", self.replay_elapsed as i64, i64),
(
"replay_total_elapsed",
self.replay_start.elapsed().as_micros() as i64,
self.started.elapsed().as_micros() as i64,
i64
),
("total_entries", total_entries as i64, i64),
("total_shreds", total_shreds as i64, i64),
("total_entries", num_entries as i64, i64),
("total_shreds", num_shreds as i64, i64),
);
}
}
struct ForkProgress {
last_entry: Hash,
num_shreds: usize,
num_entries: usize,
tick_hash_count: u64,
started_ms: u64,
is_dead: bool,
stats: ReplaySlotStats,
fork_stats: ForkStats,
replay_stats: ReplaySlotStats,
replay_progress: ConfirmationProgress,
}
impl ForkProgress {
pub fn new(slot: Slot, last_entry: Hash) -> Self {
pub fn new(last_entry: Hash) -> Self {
Self {
last_entry,
num_shreds: 0,
num_entries: 0,
tick_hash_count: 0,
started_ms: timing::timestamp(),
is_dead: false,
stats: ReplaySlotStats::new(slot),
fork_stats: ForkStats::default(),
replay_stats: ReplaySlotStats::default(),
replay_progress: ConfirmationProgress::new(last_entry),
}
}
}
@ -217,10 +200,7 @@ impl ReplayStage {
let mut progress = HashMap::new();
// Initialize progress map with any root banks
for bank in bank_forks.read().unwrap().frozen_banks().values() {
progress.insert(
bank.slot(),
ForkProgress::new(bank.slot(), bank.last_blockhash()),
);
progress.insert(bank.slot(), ForkProgress::new(bank.last_blockhash()));
}
let mut current_leader = None;
let mut last_reset = Hash::default();
@ -525,83 +505,43 @@ impl ReplayStage {
}
}
// Returns Some(result) if the `result` is a fatal error, which is an error that will cause a
// bank to be marked as dead/corrupted
fn is_replay_result_fatal(result: &Result<()>) -> bool {
match result {
Err(Error::TransactionError(e)) => {
// Transactions withand transaction errors mean this fork is bogus
let tx_error = Err(e.clone());
!Bank::can_commit(&tx_error)
}
Err(Error::BlockError(_)) => true,
Err(Error::BlockstoreError(BlockstoreError::InvalidShredData(_))) => true,
Err(Error::BlockstoreError(BlockstoreError::DeadSlot)) => true,
_ => false,
}
}
// Returns the replay result and the number of replayed transactions
fn replay_blockstore_into_bank(
bank: &Arc<Bank>,
blockstore: &Blockstore,
bank_progress: &mut ForkProgress,
transaction_status_sender: Option<TransactionStatusSender>,
verify_recyclers: &VerifyRecyclers,
) -> (Result<()>, usize) {
let mut tx_count = 0;
let now = Instant::now();
let load_result =
Self::load_blockstore_entries_with_shred_info(bank, blockstore, bank_progress);
let fetch_entries_elapsed = now.elapsed().as_micros();
if load_result.is_err() {
bank_progress.stats.fetch_entries_fail_elapsed += fetch_entries_elapsed as u64;
} else {
bank_progress.stats.fetch_entries_elapsed += fetch_entries_elapsed as u64;
}
) -> result::Result<usize, BlockstoreProcessorError> {
let tx_count_before = bank_progress.replay_progress.num_txs;
let confirm_result = blockstore_processor::confirm_slot(
blockstore,
bank,
&mut bank_progress.replay_stats,
&mut bank_progress.replay_progress,
false,
transaction_status_sender,
None,
verify_recyclers,
);
let tx_count_after = bank_progress.replay_progress.num_txs;
let tx_count = tx_count_after - tx_count_before;
let replay_result = load_result.and_then(|(entries, num_shreds, slot_full)| {
trace!(
"Fetch entries for slot {}, {:?} entries, num shreds {}, slot_full: {}",
bank.slot(),
entries.len(),
num_shreds,
slot_full,
);
tx_count += entries.iter().map(|e| e.transactions.len()).sum::<usize>();
Self::replay_entries_into_bank(
bank,
bank_progress,
entries,
num_shreds,
slot_full,
transaction_status_sender,
verify_recyclers,
)
});
if Self::is_replay_result_fatal(&replay_result) {
warn!(
"Fatal replay result in slot: {}, result: {:?}",
bank.slot(),
replay_result
);
confirm_result.map_err(|err| {
let slot = bank.slot();
warn!("Fatal replay error in slot: {}, err: {:?}", slot, err);
datapoint_error!(
"replay-stage-mark_dead_slot",
("error", format!("error: {:?}", replay_result), String),
("slot", bank.slot(), i64)
("error", format!("error: {:?}", err), String),
("slot", slot, i64)
);
Self::mark_dead_slot(bank.slot(), blockstore, bank_progress);
}
bank_progress.is_dead = true;
blockstore
.set_dead_slot(slot)
.expect("Failed to mark slot as dead in blockstore");
err
})?;
(replay_result, tx_count)
}
fn mark_dead_slot(slot: Slot, blockstore: &Blockstore, bank_progress: &mut ForkProgress) {
bank_progress.is_dead = true;
blockstore
.set_dead_slot(slot)
.expect("Failed to mark slot as dead in blockstore");
Ok(tx_count)
}
#[allow(clippy::too_many_arguments)]
@ -758,30 +698,32 @@ impl ReplayStage {
// this bank in `select_fork()`
let bank_progress = &mut progress
.entry(bank.slot())
.or_insert_with(|| ForkProgress::new(bank.slot(), bank.last_blockhash()));
.or_insert_with(|| ForkProgress::new(bank.last_blockhash()));
if bank.collector_id() != my_pubkey {
let (replay_result, replay_tx_count) = Self::replay_blockstore_into_bank(
let replay_result = Self::replay_blockstore_into_bank(
&bank,
&blockstore,
bank_progress,
transaction_status_sender.clone(),
verify_recyclers,
);
tx_count += replay_tx_count;
if Self::is_replay_result_fatal(&replay_result) {
trace!("replay_result_fatal slot {}", bank_slot);
// If the bank was corrupted, don't try to run the below logic to check if the
// bank is completed
continue;
match replay_result {
Ok(replay_tx_count) => tx_count += replay_tx_count,
Err(err) => {
trace!("replay_result err: {:?}, slot {}", err, bank_slot);
// If the bank was corrupted, don't try to run the below logic to check if the
// bank is completed
continue;
}
}
}
assert_eq!(*bank_slot, bank.slot());
if bank.tick_height() == bank.max_tick_height() {
if let Some(bank_progress) = &mut progress.get(&bank.slot()) {
bank_progress
.stats
.report_stats(bank_progress.num_entries, bank_progress.num_shreds);
}
if bank.is_complete() {
bank_progress.replay_stats.report_stats(
bank.slot(),
bank_progress.replay_progress.num_entries,
bank_progress.replay_progress.num_shreds,
);
did_complete_bank = true;
Self::process_completed_bank(my_pubkey, bank, slot_full_senders);
} else {
@ -939,7 +881,7 @@ impl ReplayStage {
) {
for (slot, prog) in progress.iter_mut() {
if !prog.fork_stats.confirmation_reported {
let duration = timing::timestamp() - prog.started_ms;
let duration = prog.replay_stats.started.elapsed().as_millis();
if tower.is_slot_confirmed(*slot, stake_lockouts, total_staked)
&& bank_forks
.read()
@ -963,141 +905,6 @@ impl ReplayStage {
}
}
fn load_blockstore_entries_with_shred_info(
bank: &Bank,
blockstore: &Blockstore,
bank_progress: &mut ForkProgress,
) -> Result<(Vec<Entry>, usize, bool)> {
blockstore
.get_slot_entries_with_shred_info(bank.slot(), bank_progress.num_shreds as u64)
.map_err(|err| err.into())
}
fn replay_entries_into_bank(
bank: &Arc<Bank>,
bank_progress: &mut ForkProgress,
entries: Vec<Entry>,
num_shreds: usize,
slot_full: bool,
transaction_status_sender: Option<TransactionStatusSender>,
verify_recyclers: &VerifyRecyclers,
) -> Result<()> {
let result = Self::verify_and_process_entries(
&bank,
&entries,
slot_full,
bank_progress.num_shreds,
bank_progress,
transaction_status_sender,
verify_recyclers,
);
bank_progress.num_shreds += num_shreds;
bank_progress.num_entries += entries.len();
if let Some(last_entry) = entries.last() {
bank_progress.last_entry = last_entry.hash;
}
result
}
fn verify_ticks(
bank: &Arc<Bank>,
entries: &[Entry],
slot_full: bool,
tick_hash_count: &mut u64,
) -> std::result::Result<(), BlockError> {
let next_bank_tick_height = bank.tick_height() + entries.tick_count();
let max_bank_tick_height = bank.max_tick_height();
if next_bank_tick_height > max_bank_tick_height {
return Err(BlockError::InvalidTickCount);
}
if next_bank_tick_height < max_bank_tick_height && slot_full {
return Err(BlockError::InvalidTickCount);
}
if next_bank_tick_height == max_bank_tick_height {
let has_trailing_entry = !entries.last().unwrap().is_tick();
if has_trailing_entry {
return Err(BlockError::TrailingEntry);
}
if !slot_full {
return Err(BlockError::InvalidLastTick);
}
}
let hashes_per_tick = bank.hashes_per_tick().unwrap_or(0);
if !entries.verify_tick_hash_count(tick_hash_count, hashes_per_tick) {
return Err(BlockError::InvalidTickHashCount);
}
Ok(())
}
fn verify_and_process_entries(
bank: &Arc<Bank>,
entries: &[Entry],
slot_full: bool,
shred_index: usize,
bank_progress: &mut ForkProgress,
transaction_status_sender: Option<TransactionStatusSender>,
recyclers: &VerifyRecyclers,
) -> Result<()> {
let last_entry = &bank_progress.last_entry;
let tick_hash_count = &mut bank_progress.tick_hash_count;
let handle_block_error = move |block_error: BlockError| -> Result<()> {
warn!(
"{:#?}, slot: {}, entry len: {}, tick_height: {}, last entry: {}, last_blockhash: {}, shred_index: {}, slot_full: {}",
block_error,
bank.slot(),
entries.len(),
bank.tick_height(),
last_entry,
bank.last_blockhash(),
shred_index,
slot_full,
);
datapoint_error!(
"replay-stage-block-error",
("slot", bank.slot(), i64),
("last_entry", last_entry.to_string(), String),
);
Err(Error::BlockError(block_error))
};
if let Err(block_error) = Self::verify_ticks(bank, entries, slot_full, tick_hash_count) {
return handle_block_error(block_error);
}
datapoint_debug!("verify-batch-size", ("size", entries.len() as i64, i64));
let mut verify_total = Measure::start("verify_and_process_entries");
let mut entry_state = entries.start_verify(last_entry, recyclers.clone());
if entry_state.status() == EntryVerificationStatus::Failure {
return handle_block_error(BlockError::InvalidEntryHash);
}
let mut replay_elapsed = Measure::start("replay_elapsed");
let res =
blockstore_processor::process_entries(bank, entries, true, transaction_status_sender);
replay_elapsed.stop();
bank_progress.stats.replay_elapsed += replay_elapsed.as_us();
if !entry_state.finish_verify(entries) {
return handle_block_error(BlockError::InvalidEntryHash);
}
verify_total.stop();
bank_progress.stats.entry_verification_elapsed =
verify_total.as_us() - replay_elapsed.as_us();
res?;
Ok(())
}
fn handle_new_root(
bank_forks: &Arc<RwLock<BankForks>>,
progress: &mut HashMap<u64, ForkProgress>,
@ -1192,10 +999,11 @@ pub(crate) mod tests {
use crossbeam_channel::unbounded;
use solana_client::rpc_request::RpcEncodedTransaction;
use solana_ledger::{
block_error::BlockError,
blockstore::make_slot_entries,
blockstore::{entries_to_test_shreds, BlockstoreError},
create_new_tmp_ledger,
entry::{self, next_entry},
entry::{self, next_entry, Entry},
get_tmp_ledger_path,
shred::{
CodingShredHeader, DataShredHeader, Shred, ShredCommonHeader, DATA_COMPLETE_SHRED,
@ -1328,12 +1136,7 @@ pub(crate) mod tests {
for fork_progress in fork_progresses.iter_mut() {
fork_progress
.entry(neutral_fork.fork[0])
.or_insert_with(|| {
ForkProgress::new(
bank_forks.banks[&0].slot(),
bank_forks.banks[&0].last_blockhash(),
)
});
.or_insert_with(|| ForkProgress::new(bank_forks.banks[&0].last_blockhash()));
}
for index in 1..neutral_fork.fork.len() {
@ -1360,7 +1163,6 @@ pub(crate) mod tests {
.entry(bank_forks.banks[&neutral_fork.fork[index]].slot())
.or_insert_with(|| {
ForkProgress::new(
bank_forks.banks[&neutral_fork.fork[index]].slot(),
bank_forks.banks[&neutral_fork.fork[index]].last_blockhash(),
)
});
@ -1404,7 +1206,6 @@ pub(crate) mod tests {
.entry(bank_forks.banks[&fork_info.fork[index]].slot())
.or_insert_with(|| {
ForkProgress::new(
bank_forks.banks[&fork_info.fork[index]].slot(),
bank_forks.banks[&fork_info.fork[index]].last_blockhash(),
)
});
@ -1551,7 +1352,7 @@ pub(crate) mod tests {
let bank0 = Bank::new(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank0)));
let mut progress = HashMap::new();
progress.insert(5, ForkProgress::new(0, Hash::default()));
progress.insert(5, ForkProgress::new(Hash::default()));
ReplayStage::handle_new_root(&bank_forks, &mut progress);
assert!(progress.is_empty());
}
@ -1585,7 +1386,9 @@ pub(crate) mod tests {
assert_matches!(
res,
Err(Error::TransactionError(TransactionError::AccountNotFound))
Err(BlockstoreProcessorError::InvalidTransaction(
TransactionError::AccountNotFound
))
);
}
@ -1611,7 +1414,7 @@ pub(crate) mod tests {
entries_to_test_shreds(vec![entry], slot, slot.saturating_sub(1), false, 0)
});
if let Err(Error::BlockError(block_error)) = res {
if let Err(BlockstoreProcessorError::InvalidBlock(block_error)) = res {
assert_eq!(block_error, BlockError::InvalidEntryHash);
} else {
assert!(false);
@ -1636,7 +1439,7 @@ pub(crate) mod tests {
)
});
if let Err(Error::BlockError(block_error)) = res {
if let Err(BlockstoreProcessorError::InvalidBlock(block_error)) = res {
assert_eq!(block_error, BlockError::InvalidTickHashCount);
} else {
assert!(false);
@ -1659,7 +1462,7 @@ pub(crate) mod tests {
)
});
if let Err(Error::BlockError(block_error)) = res {
if let Err(BlockstoreProcessorError::InvalidBlock(block_error)) = res {
assert_eq!(block_error, BlockError::InvalidTickCount);
} else {
assert!(false);
@ -1679,7 +1482,7 @@ pub(crate) mod tests {
)
});
if let Err(Error::BlockError(block_error)) = res {
if let Err(BlockstoreProcessorError::InvalidBlock(block_error)) = res {
assert_eq!(block_error, BlockError::InvalidTickCount);
} else {
assert!(false);
@ -1701,7 +1504,7 @@ pub(crate) mod tests {
)
});
if let Err(Error::BlockError(block_error)) = res {
if let Err(BlockstoreProcessorError::InvalidBlock(block_error)) = res {
assert_eq!(block_error, BlockError::InvalidLastTick);
} else {
assert!(false);
@ -1725,7 +1528,7 @@ pub(crate) mod tests {
entries_to_test_shreds(entries, slot, slot.saturating_sub(1), true, 0)
});
if let Err(Error::BlockError(block_error)) = res {
if let Err(BlockstoreProcessorError::InvalidBlock(block_error)) = res {
assert_eq!(block_error, BlockError::TrailingEntry);
} else {
assert!(false);
@ -1755,13 +1558,15 @@ pub(crate) mod tests {
assert_matches!(
res,
Err(Error::BlockstoreError(BlockstoreError::InvalidShredData(_)))
Err(
BlockstoreProcessorError::FailedToLoadEntries(BlockstoreError::InvalidShredData(_)),
)
);
}
// Given a shred and a fatal expected error, check that replaying that shred causes causes the fork to be
// marked as dead. Returns the error for caller to verify.
fn check_dead_fork<F>(shred_to_insert: F) -> Result<()>
fn check_dead_fork<F>(shred_to_insert: F) -> result::Result<(), BlockstoreProcessorError>
where
F: Fn(&Keypair, Arc<Bank>) -> Vec<Shred>,
{
@ -1782,10 +1587,10 @@ pub(crate) mod tests {
let last_blockhash = bank0.last_blockhash();
let mut bank0_progress = progress
.entry(bank0.slot())
.or_insert_with(|| ForkProgress::new(0, last_blockhash));
.or_insert_with(|| ForkProgress::new(last_blockhash));
let shreds = shred_to_insert(&mint_keypair, bank0.clone());
blockstore.insert_shreds(shreds, None, false).unwrap();
let (res, _tx_count) = ReplayStage::replay_blockstore_into_bank(
let res = ReplayStage::replay_blockstore_into_bank(
&bank0,
&blockstore,
&mut bank0_progress,
@ -1801,7 +1606,7 @@ pub(crate) mod tests {
// Check that the erroring bank was marked as dead in blockstore
assert!(blockstore.is_dead(bank0.slot()));
res
res.map(|_| ())
};
let _ignored = remove_dir_all(&ledger_path);
res