Verify number of hashes for each block of entries (#6262)

* Verify number of hashes for each block of entries

* Fix blocktree processor tick check

* Rebase once more
This commit is contained in:
Justin Starry
2019-10-31 16:38:50 -04:00
committed by GitHub
parent 111942a47d
commit e8e5ddc55d
19 changed files with 537 additions and 139 deletions

View File

@ -134,7 +134,7 @@ mod test {
let (slot_full_sender, slot_full_receiver) = channel();
// Create entries - 4 ticks + 1 populated entry + 1 tick
let mut entries = create_ticks(4, Hash::default());
let mut entries = create_ticks(4, 0, Hash::default());
let keypair = Keypair::new();
let mut blockhash = entries[3].hash;
@ -142,7 +142,7 @@ mod test {
let entry = Entry::new(&mut blockhash, 1, vec![tx]);
blockhash = entry.hash;
entries.push(entry);
let final_tick = create_ticks(1, blockhash);
let final_tick = create_ticks(1, 0, blockhash);
entries.extend_from_slice(&final_tick);
let expected_entries = entries.clone();

View File

@ -281,7 +281,7 @@ mod test {
max_tick_height = bank.max_tick_height();
ticks_per_slot = bank.ticks_per_slot();
slot = bank.slot();
let ticks = create_ticks(max_tick_height - start_tick_height, Hash::default());
let ticks = create_ticks(max_tick_height - start_tick_height, 0, Hash::default());
for (i, tick) in ticks.into_iter().enumerate() {
entry_sender
.send((bank.clone(), (tick, i as u64 + 1)))

View File

@ -334,7 +334,7 @@ mod test {
setup(num_shreds_per_slot);
// Insert 1 less than the number of ticks needed to finish the slot
let ticks = create_ticks(genesis_block.ticks_per_slot - 1, genesis_block.hash());
let ticks = create_ticks(genesis_block.ticks_per_slot - 1, 0, genesis_block.hash());
let receive_results = ReceiveResults {
entries: ticks.clone(),
time_elapsed: Duration::new(3, 0),
@ -372,7 +372,7 @@ mod test {
// Interrupting the slot should cause the unfinished_slot and stats to reset
let num_shreds = 1;
assert!(num_shreds < num_shreds_per_slot);
let ticks = create_ticks(max_ticks_per_n_shreds(num_shreds), genesis_block.hash());
let ticks = create_ticks(max_ticks_per_n_shreds(num_shreds), 0, genesis_block.hash());
let receive_results = ReceiveResults {
entries: ticks.clone(),
time_elapsed: Duration::new(2, 0),
@ -401,7 +401,7 @@ mod test {
setup(num_shreds_per_slot);
// Insert complete slot of ticks needed to finish the slot
let ticks = create_ticks(genesis_block.ticks_per_slot, genesis_block.hash());
let ticks = create_ticks(genesis_block.ticks_per_slot, 0, genesis_block.hash());
let receive_results = ReceiveResults {
entries: ticks.clone(),
time_elapsed: Duration::new(3, 0),

View File

@ -131,7 +131,7 @@ mod tests {
}
let slots_per_segment = 32;
let entries = create_ticks(slots_per_segment, Hash::default());
let entries = create_ticks(slots_per_segment, 0, Hash::default());
let ledger_dir = "test_encrypt_file_many_keys_single";
let ledger_path = get_tmp_ledger_path(ledger_dir);
let ticks_per_slot = 16;
@ -196,7 +196,7 @@ mod tests {
let ledger_dir = "test_encrypt_file_many_keys_multiple";
let ledger_path = get_tmp_ledger_path(ledger_dir);
let ticks_per_slot = 90;
let entries = create_ticks(2 * ticks_per_slot, Hash::default());
let entries = create_ticks(2 * ticks_per_slot, 0, Hash::default());
let blocktree = Arc::new(Blocktree::open(&ledger_path).unwrap());
blocktree
.write_entries(

View File

@ -160,14 +160,6 @@ impl fmt::Debug for Blob {
}
}
#[derive(Debug)]
pub enum BlobError {
/// the Blob's meta and data are not self-consistent
BadState,
/// Blob verification failed
VerificationFailed,
}
impl Packets {
pub fn recv_from(&mut self, socket: &UdpSocket) -> Result<usize> {
let mut i = 0;

View File

@ -5,12 +5,12 @@ use crate::confidence::{
AggregateConfidenceService, ConfidenceAggregationData, ForkConfidenceCache,
};
use crate::consensus::{StakeLockout, Tower};
use crate::packet::BlobError;
use crate::poh_recorder::PohRecorder;
use crate::result::{Error, Result};
use crate::rpc_subscriptions::RpcSubscriptions;
use crate::service::Service;
use solana_ledger::bank_forks::BankForks;
use solana_ledger::block_error::BlockError;
use solana_ledger::blocktree::{Blocktree, BlocktreeError};
use solana_ledger::blocktree_processor;
use solana_ledger::entry::{Entry, EntrySlice};
@ -113,6 +113,7 @@ struct ForkProgress {
last_entry: Hash,
num_shreds: usize,
num_entries: usize,
tick_hash_count: u64,
started_ms: u64,
is_dead: bool,
stats: ReplaySlotStats,
@ -124,6 +125,7 @@ impl ForkProgress {
last_entry,
num_shreds: 0,
num_entries: 0,
tick_hash_count: 0,
started_ms: timing::timestamp(),
is_dead: false,
stats: ReplaySlotStats::new(slot),
@ -399,7 +401,7 @@ impl ReplayStage {
let tx_error = Err(e.clone());
!Bank::can_commit(&tx_error)
}
Err(Error::BlobError(BlobError::VerificationFailed)) => true,
Err(Error::BlockError(_)) => true,
Err(Error::BlocktreeError(BlocktreeError::InvalidShredData(_))) => true,
_ => false,
}
@ -759,25 +761,46 @@ impl ReplayStage {
result
}
fn verify_ticks(
bank: &Arc<Bank>,
entries: &[Entry],
tick_hash_count: &mut u64,
) -> std::result::Result<(), BlockError> {
if entries.is_empty() {
return Ok(());
}
let hashes_per_tick = bank.hashes_per_tick().unwrap_or(0);
if !entries.verify_tick_hash_count(tick_hash_count, hashes_per_tick) {
return Err(BlockError::InvalidTickHashCount);
}
let next_bank_tick_height = bank.tick_height() + entries.tick_count();
let max_bank_tick_height = bank.max_tick_height();
if next_bank_tick_height > max_bank_tick_height {
return Err(BlockError::InvalidTickCount);
}
let has_trailing_entry = !entries.last().unwrap().is_tick();
if next_bank_tick_height == max_bank_tick_height && has_trailing_entry {
return Err(BlockError::TrailingEntry);
}
Ok(())
}
fn verify_and_process_entries(
bank: &Arc<Bank>,
entries: &[Entry],
shred_index: usize,
bank_progress: &mut ForkProgress,
) -> Result<()> {
datapoint_debug!("verify-batch-size", ("size", entries.len() as i64, i64));
let mut verify_total = Measure::start("verify_and_process_entries");
let last_entry = &bank_progress.last_entry;
let mut entry_state = entries.start_verify(last_entry);
let mut replay_elapsed = Measure::start("replay_elapsed");
let res = blocktree_processor::process_entries(bank, entries, true);
replay_elapsed.stop();
bank_progress.stats.replay_elapsed += replay_elapsed.as_us();
if !entry_state.finish_verify(entries) {
info!(
"entry verification failed, slot: {}, entry len: {}, tick_height: {}, last entry: {}, last_blockhash: {}, shred_index: {}",
let tick_hash_count = &mut bank_progress.tick_hash_count;
let handle_block_error = move |block_error: BlockError| -> Result<()> {
warn!(
"{:#?}, slot: {}, entry len: {}, tick_height: {}, last entry: {}, last_blockhash: {}, shred_index: {}",
block_error,
bank.slot(),
entries.len(),
bank.tick_height(),
@ -791,8 +814,27 @@ impl ReplayStage {
("slot", bank.slot(), i64),
("last_entry", last_entry.to_string(), String),
);
return Err(Error::BlobError(BlobError::VerificationFailed));
Err(Error::BlockError(block_error))
};
if let Err(block_error) = Self::verify_ticks(bank, entries, tick_hash_count) {
return handle_block_error(block_error);
}
datapoint_info!("verify-batch-size", ("size", entries.len() as i64, i64));
let mut verify_total = Measure::start("verify_and_process_entries");
let mut entry_state = entries.start_verify(last_entry);
let mut replay_elapsed = Measure::start("replay_elapsed");
let res = blocktree_processor::process_entries(bank, entries, true);
replay_elapsed.stop();
bank_progress.stats.replay_elapsed += replay_elapsed.as_us();
if !entry_state.finish_verify(entries) {
return handle_block_error(BlockError::InvalidEntryHash);
}
verify_total.stop();
bank_progress.stats.entry_verification_elapsed =
verify_total.as_us() - replay_elapsed.as_us();
@ -951,17 +993,20 @@ mod test {
let missing_keypair = Keypair::new();
let missing_keypair2 = Keypair::new();
let res = check_dead_fork(|_keypair, blockhash, slot| {
let res = check_dead_fork(|_keypair, bank| {
let blockhash = bank.last_blockhash();
let slot = bank.slot();
let hashes_per_tick = bank.hashes_per_tick().unwrap_or(0);
let entry = entry::next_entry(
blockhash,
1,
&blockhash,
hashes_per_tick.saturating_sub(1),
vec![
system_transaction::transfer(&keypair1, &keypair2.pubkey(), 2, *blockhash), // should be fine,
system_transaction::transfer(&keypair1, &keypair2.pubkey(), 2, blockhash), // should be fine,
system_transaction::transfer(
&missing_keypair,
&missing_keypair2.pubkey(),
2,
*blockhash,
blockhash,
), // should cause AccountNotFound error
],
);
@ -977,29 +1022,105 @@ mod test {
#[test]
fn test_dead_fork_entry_verification_failure() {
let keypair2 = Keypair::new();
let res = check_dead_fork(|genesis_keypair, blockhash, slot| {
let res = check_dead_fork(|genesis_keypair, bank| {
let blockhash = bank.last_blockhash();
let slot = bank.slot();
let bad_hash = hash(&[2; 30]);
let hashes_per_tick = bank.hashes_per_tick().unwrap_or(0);
let entry = entry::next_entry(
// Use wrong blockhash so that the entry causes an entry verification failure
&bad_hash,
1,
hashes_per_tick.saturating_sub(1),
vec![system_transaction::transfer(
&genesis_keypair,
&keypair2.pubkey(),
2,
*blockhash,
blockhash,
)],
);
entries_to_test_shreds(vec![entry], slot, slot.saturating_sub(1), false)
});
assert_matches!(res, Err(Error::BlobError(BlobError::VerificationFailed)));
if let Err(Error::BlockError(block_error)) = res {
assert_eq!(block_error, BlockError::InvalidEntryHash);
} else {
assert!(false);
}
}
#[test]
fn test_dead_fork_invalid_tick_hash_count() {
let res = check_dead_fork(|_keypair, bank| {
let blockhash = bank.last_blockhash();
let slot = bank.slot();
let hashes_per_tick = bank.hashes_per_tick().unwrap_or(0);
assert!(hashes_per_tick > 0);
let too_few_hashes_tick = Entry::new(&blockhash, hashes_per_tick - 1, vec![]);
entries_to_test_shreds(
vec![too_few_hashes_tick],
slot,
slot.saturating_sub(1),
false,
)
});
if let Err(Error::BlockError(block_error)) = res {
assert_eq!(block_error, BlockError::InvalidTickHashCount);
} else {
assert!(false);
}
}
#[test]
fn test_dead_fork_invalid_slot_tick_count() {
let res = check_dead_fork(|_keypair, bank| {
let blockhash = bank.last_blockhash();
let slot = bank.slot();
let hashes_per_tick = bank.hashes_per_tick().unwrap_or(0);
entries_to_test_shreds(
entry::create_ticks(bank.ticks_per_slot() + 1, hashes_per_tick, blockhash),
slot,
slot.saturating_sub(1),
false,
)
});
if let Err(Error::BlockError(block_error)) = res {
assert_eq!(block_error, BlockError::InvalidTickCount);
} else {
assert!(false);
}
}
#[test]
fn test_dead_fork_trailing_entry() {
let keypair = Keypair::new();
let res = check_dead_fork(|genesis_keypair, bank| {
let blockhash = bank.last_blockhash();
let slot = bank.slot();
let hashes_per_tick = bank.hashes_per_tick().unwrap_or(0);
let mut entries =
entry::create_ticks(bank.ticks_per_slot(), hashes_per_tick, blockhash.clone());
let last_entry_hash = entries.last().unwrap().hash;
let tx =
system_transaction::transfer(&genesis_keypair, &keypair.pubkey(), 2, blockhash);
let trailing_entry = entry::next_entry(&last_entry_hash, 1, vec![tx]);
entries.push(trailing_entry);
entries_to_test_shreds(entries, slot, slot.saturating_sub(1), false)
});
if let Err(Error::BlockError(block_error)) = res {
assert_eq!(block_error, BlockError::TrailingEntry);
} else {
assert!(false);
}
}
#[test]
fn test_dead_fork_entry_deserialize_failure() {
// Insert entry that causes deserialization failure
let res = check_dead_fork(|_, _, _| {
let res = check_dead_fork(|_, _| {
let payload_len = SIZE_OF_DATA_SHRED_PAYLOAD;
let gibberish = [0xa5u8; PACKET_DATA_SIZE];
let mut data_header = DataShredHeader::default();
@ -1027,7 +1148,7 @@ mod test {
// marked as dead. Returns the error for caller to verify.
fn check_dead_fork<F>(shred_to_insert: F) -> Result<()>
where
F: Fn(&Keypair, &Hash, u64) -> Vec<Shred>,
F: Fn(&Keypair, Arc<Bank>) -> Vec<Shred>,
{
let ledger_path = get_tmp_ledger_path!();
let res = {
@ -1035,15 +1156,16 @@ mod test {
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"),
);
let GenesisBlockInfo {
genesis_block,
mut genesis_block,
mint_keypair,
..
} = create_genesis_block(1000);
genesis_block.poh_config.hashes_per_tick = Some(2);
let bank0 = Arc::new(Bank::new(&genesis_block));
let mut progress = HashMap::new();
let last_blockhash = bank0.last_blockhash();
progress.insert(bank0.slot(), ForkProgress::new(0, last_blockhash));
let shreds = shred_to_insert(&mint_keypair, &last_blockhash, bank0.slot());
let shreds = shred_to_insert(&mint_keypair, bank0.clone());
blocktree.insert_shreds(shreds, None).unwrap();
let (res, _tx_count) =
ReplayStage::replay_blocktree_into_bank(&bank0, &blocktree, &mut progress);

View File

@ -1,8 +1,8 @@
//! The `result` module exposes a Result type that propagates one of many different Error types.
use crate::cluster_info;
use crate::packet;
use crate::poh_recorder;
use solana_ledger::block_error;
use solana_ledger::blocktree;
use solana_ledger::snapshot_utils;
use solana_sdk::transaction;
@ -23,10 +23,10 @@ pub enum Error {
Serialize(std::boxed::Box<bincode::ErrorKind>),
TransactionError(transaction::TransactionError),
ClusterInfoError(cluster_info::ClusterInfoError),
BlobError(packet::BlobError),
ErasureError(reed_solomon_erasure::Error),
SendError,
PohRecorderError(poh_recorder::PohRecorderError),
BlockError(block_error::BlockError),
BlocktreeError(blocktree::BlocktreeError),
FsExtra(fs_extra::error::Error),
ToBlobError,

View File

@ -326,7 +326,7 @@ mod test {
let blocktree_path = get_tmp_ledger_path!();
let blocktree = Arc::new(Blocktree::open(&blocktree_path).unwrap());
let num_entries = 10;
let original_entries = create_ticks(num_entries, Hash::default());
let original_entries = create_ticks(num_entries, 0, Hash::default());
let mut shreds = local_entries_to_shred(&original_entries, 0, 0, &Arc::new(Keypair::new()));
shreds.reverse();
blocktree