diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 21508def97..ad189cb512 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -55,6 +55,7 @@ use std::{ }, time::Duration, }; +use thiserror::Error; use trees::{Tree, TreeWalk}; pub mod blockstore_purge; @@ -92,6 +93,19 @@ pub enum PurgeType { PrimaryIndex, } +#[derive(Error, Debug)] +pub enum InsertDataShredError { + Exists, + InvalidShred, + BlockstoreError(#[from] BlockstoreError), +} + +impl std::fmt::Display for InsertDataShredError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "insert data shred error") + } +} + // ledger window pub struct Blockstore { db: Arc, @@ -143,6 +157,10 @@ pub struct BlockstoreInsertionMetrics { pub total_elapsed: u64, pub num_inserted: u64, pub num_recovered: usize, + pub num_recovered_inserted: usize, + pub num_recovered_failed_sig: usize, + pub num_recovered_failed_invalid: usize, + pub num_recovered_exists: usize, pub index_meta_time: u64, } @@ -182,6 +200,26 @@ impl BlockstoreInsertionMetrics { ("write_batch_elapsed", self.write_batch_elapsed as i64, i64), ("num_inserted", self.num_inserted as i64, i64), ("num_recovered", self.num_recovered as i64, i64), + ( + "num_recovered_inserted", + self.num_recovered_inserted as i64, + i64 + ), + ( + "num_recovered_failed_sig", + self.num_recovered_failed_sig as i64, + i64 + ), + ( + "num_recovered_failed_invalid", + self.num_recovered_failed_invalid as i64, + i64 + ), + ( + "num_recovered_exists", + self.num_recovered_exists as i64, + i64 + ), ); } } @@ -669,19 +707,22 @@ impl Blockstore { let mut index_meta_time = 0; shreds.into_iter().for_each(|shred| { if shred.is_data() { - if self.check_insert_data_shred( - shred, - &mut erasure_metas, - &mut index_working_set, - &mut slot_meta_working_set, - &mut write_batch, - &mut just_inserted_data_shreds, - &mut index_meta_time, - is_trusted, - handle_duplicate, - leader_schedule, - false, - ) { + if self + .check_insert_data_shred( + shred, + &mut erasure_metas, + &mut index_working_set, + &mut slot_meta_working_set, + &mut write_batch, + &mut just_inserted_data_shreds, + &mut index_meta_time, + is_trusted, + handle_duplicate, + leader_schedule, + false, + ) + .is_ok() + { num_inserted += 1; } } else if shred.is_code() { @@ -702,6 +743,10 @@ impl Blockstore { let insert_shreds_elapsed = start.as_us(); let mut start = Measure::start("Shred recovery"); let mut num_recovered = 0; + let mut num_recovered_inserted = 0; + let mut num_recovered_failed_sig = 0; + let mut num_recovered_failed_invalid = 0; + let mut num_recovered_exists = 0; if let Some(leader_schedule_cache) = leader_schedule { let recovered_data = Self::try_shred_recovery( &db, @@ -715,7 +760,7 @@ impl Blockstore { recovered_data.into_iter().for_each(|shred| { if let Some(leader) = leader_schedule_cache.slot_leader_at(shred.slot(), None) { if shred.verify(&leader) { - self.check_insert_data_shred( + match self.check_insert_data_shred( shred, &mut erasure_metas, &mut index_working_set, @@ -727,7 +772,20 @@ impl Blockstore { &handle_duplicate, leader_schedule, true, - ); + ) { + Err(InsertDataShredError::Exists) => { + num_recovered_exists += 1; + } + Err(InsertDataShredError::InvalidShred) => { + num_recovered_failed_invalid += 1; + } + Err(InsertDataShredError::BlockstoreError(_)) => {} + Ok(()) => { + num_recovered_inserted += 1; + } + } + } else { + num_recovered_failed_sig += 1; } } }); @@ -797,6 +855,10 @@ impl Blockstore { metrics.write_batch_elapsed += write_batch_elapsed; metrics.num_inserted += num_inserted; metrics.num_recovered += num_recovered; + metrics.num_recovered_inserted += num_recovered_inserted; + metrics.num_recovered_failed_sig += num_recovered_failed_sig; + metrics.num_recovered_failed_invalid = num_recovered_failed_invalid; + metrics.num_recovered_exists = num_recovered_exists; metrics.index_meta_time += index_meta_time; Ok(()) @@ -941,7 +1003,7 @@ impl Blockstore { handle_duplicate: &F, leader_schedule: Option<&Arc>, is_recovered: bool, - ) -> bool + ) -> std::result::Result<(), InsertDataShredError> where F: Fn(Shred), { @@ -960,7 +1022,7 @@ impl Blockstore { if !is_trusted { if Self::is_data_shred_present(&shred, slot_meta, &index_meta.data()) { handle_duplicate(shred); - return false; + return Err(InsertDataShredError::Exists); } else if !Blockstore::should_insert_data_shred( &shred, slot_meta, @@ -968,30 +1030,25 @@ impl Blockstore { leader_schedule, is_recovered, ) { - return false; + return Err(InsertDataShredError::InvalidShred); } } let set_index = u64::from(shred.common_header.fec_set_index); - if let Ok(()) = - self.insert_data_shred(slot_meta, index_meta.data_mut(), &shred, write_batch) - { - just_inserted_data_shreds.insert((slot, shred_index), shred); - index_meta_working_set_entry.did_insert_occur = true; - slot_meta_entry.did_insert_occur = true; - if !erasure_metas.contains_key(&(slot, set_index)) { - if let Some(meta) = self - .erasure_meta_cf - .get((slot, set_index)) - .expect("Expect database get to succeed") - { - erasure_metas.insert((slot, set_index), meta); - } + self.insert_data_shred(slot_meta, index_meta.data_mut(), &shred, write_batch)?; + just_inserted_data_shreds.insert((slot, shred_index), shred); + index_meta_working_set_entry.did_insert_occur = true; + slot_meta_entry.did_insert_occur = true; + if !erasure_metas.contains_key(&(slot, set_index)) { + if let Some(meta) = self + .erasure_meta_cf + .get((slot, set_index)) + .expect("Expect database get to succeed") + { + erasure_metas.insert((slot, set_index), meta); } - true - } else { - false } + Ok(()) } fn should_insert_coding_shred(