Add breakdown of erasure blobs (#10912)
Co-authored-by: Carl <carl@solana.com>
This commit is contained in:
@ -55,6 +55,7 @@ use std::{
|
|||||||
},
|
},
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
use thiserror::Error;
|
||||||
use trees::{Tree, TreeWalk};
|
use trees::{Tree, TreeWalk};
|
||||||
|
|
||||||
pub mod blockstore_purge;
|
pub mod blockstore_purge;
|
||||||
@ -92,6 +93,19 @@ pub enum PurgeType {
|
|||||||
PrimaryIndex,
|
PrimaryIndex,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Error, Debug)]
|
||||||
|
pub enum InsertDataShredError {
|
||||||
|
Exists,
|
||||||
|
InvalidShred,
|
||||||
|
BlockstoreError(#[from] BlockstoreError),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for InsertDataShredError {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
write!(f, "insert data shred error")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ledger window
|
// ledger window
|
||||||
pub struct Blockstore {
|
pub struct Blockstore {
|
||||||
db: Arc<Database>,
|
db: Arc<Database>,
|
||||||
@ -143,6 +157,10 @@ pub struct BlockstoreInsertionMetrics {
|
|||||||
pub total_elapsed: u64,
|
pub total_elapsed: u64,
|
||||||
pub num_inserted: u64,
|
pub num_inserted: u64,
|
||||||
pub num_recovered: usize,
|
pub num_recovered: usize,
|
||||||
|
pub num_recovered_inserted: usize,
|
||||||
|
pub num_recovered_failed_sig: usize,
|
||||||
|
pub num_recovered_failed_invalid: usize,
|
||||||
|
pub num_recovered_exists: usize,
|
||||||
pub index_meta_time: u64,
|
pub index_meta_time: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -182,6 +200,26 @@ impl BlockstoreInsertionMetrics {
|
|||||||
("write_batch_elapsed", self.write_batch_elapsed as i64, i64),
|
("write_batch_elapsed", self.write_batch_elapsed as i64, i64),
|
||||||
("num_inserted", self.num_inserted as i64, i64),
|
("num_inserted", self.num_inserted as i64, i64),
|
||||||
("num_recovered", self.num_recovered as i64, i64),
|
("num_recovered", self.num_recovered as i64, i64),
|
||||||
|
(
|
||||||
|
"num_recovered_inserted",
|
||||||
|
self.num_recovered_inserted as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"num_recovered_failed_sig",
|
||||||
|
self.num_recovered_failed_sig as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"num_recovered_failed_invalid",
|
||||||
|
self.num_recovered_failed_invalid as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"num_recovered_exists",
|
||||||
|
self.num_recovered_exists as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -669,19 +707,22 @@ impl Blockstore {
|
|||||||
let mut index_meta_time = 0;
|
let mut index_meta_time = 0;
|
||||||
shreds.into_iter().for_each(|shred| {
|
shreds.into_iter().for_each(|shred| {
|
||||||
if shred.is_data() {
|
if shred.is_data() {
|
||||||
if self.check_insert_data_shred(
|
if self
|
||||||
shred,
|
.check_insert_data_shred(
|
||||||
&mut erasure_metas,
|
shred,
|
||||||
&mut index_working_set,
|
&mut erasure_metas,
|
||||||
&mut slot_meta_working_set,
|
&mut index_working_set,
|
||||||
&mut write_batch,
|
&mut slot_meta_working_set,
|
||||||
&mut just_inserted_data_shreds,
|
&mut write_batch,
|
||||||
&mut index_meta_time,
|
&mut just_inserted_data_shreds,
|
||||||
is_trusted,
|
&mut index_meta_time,
|
||||||
handle_duplicate,
|
is_trusted,
|
||||||
leader_schedule,
|
handle_duplicate,
|
||||||
false,
|
leader_schedule,
|
||||||
) {
|
false,
|
||||||
|
)
|
||||||
|
.is_ok()
|
||||||
|
{
|
||||||
num_inserted += 1;
|
num_inserted += 1;
|
||||||
}
|
}
|
||||||
} else if shred.is_code() {
|
} else if shred.is_code() {
|
||||||
@ -702,6 +743,10 @@ impl Blockstore {
|
|||||||
let insert_shreds_elapsed = start.as_us();
|
let insert_shreds_elapsed = start.as_us();
|
||||||
let mut start = Measure::start("Shred recovery");
|
let mut start = Measure::start("Shred recovery");
|
||||||
let mut num_recovered = 0;
|
let mut num_recovered = 0;
|
||||||
|
let mut num_recovered_inserted = 0;
|
||||||
|
let mut num_recovered_failed_sig = 0;
|
||||||
|
let mut num_recovered_failed_invalid = 0;
|
||||||
|
let mut num_recovered_exists = 0;
|
||||||
if let Some(leader_schedule_cache) = leader_schedule {
|
if let Some(leader_schedule_cache) = leader_schedule {
|
||||||
let recovered_data = Self::try_shred_recovery(
|
let recovered_data = Self::try_shred_recovery(
|
||||||
&db,
|
&db,
|
||||||
@ -715,7 +760,7 @@ impl Blockstore {
|
|||||||
recovered_data.into_iter().for_each(|shred| {
|
recovered_data.into_iter().for_each(|shred| {
|
||||||
if let Some(leader) = leader_schedule_cache.slot_leader_at(shred.slot(), None) {
|
if let Some(leader) = leader_schedule_cache.slot_leader_at(shred.slot(), None) {
|
||||||
if shred.verify(&leader) {
|
if shred.verify(&leader) {
|
||||||
self.check_insert_data_shred(
|
match self.check_insert_data_shred(
|
||||||
shred,
|
shred,
|
||||||
&mut erasure_metas,
|
&mut erasure_metas,
|
||||||
&mut index_working_set,
|
&mut index_working_set,
|
||||||
@ -727,7 +772,20 @@ impl Blockstore {
|
|||||||
&handle_duplicate,
|
&handle_duplicate,
|
||||||
leader_schedule,
|
leader_schedule,
|
||||||
true,
|
true,
|
||||||
);
|
) {
|
||||||
|
Err(InsertDataShredError::Exists) => {
|
||||||
|
num_recovered_exists += 1;
|
||||||
|
}
|
||||||
|
Err(InsertDataShredError::InvalidShred) => {
|
||||||
|
num_recovered_failed_invalid += 1;
|
||||||
|
}
|
||||||
|
Err(InsertDataShredError::BlockstoreError(_)) => {}
|
||||||
|
Ok(()) => {
|
||||||
|
num_recovered_inserted += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
num_recovered_failed_sig += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@ -797,6 +855,10 @@ impl Blockstore {
|
|||||||
metrics.write_batch_elapsed += write_batch_elapsed;
|
metrics.write_batch_elapsed += write_batch_elapsed;
|
||||||
metrics.num_inserted += num_inserted;
|
metrics.num_inserted += num_inserted;
|
||||||
metrics.num_recovered += num_recovered;
|
metrics.num_recovered += num_recovered;
|
||||||
|
metrics.num_recovered_inserted += num_recovered_inserted;
|
||||||
|
metrics.num_recovered_failed_sig += num_recovered_failed_sig;
|
||||||
|
metrics.num_recovered_failed_invalid = num_recovered_failed_invalid;
|
||||||
|
metrics.num_recovered_exists = num_recovered_exists;
|
||||||
metrics.index_meta_time += index_meta_time;
|
metrics.index_meta_time += index_meta_time;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -941,7 +1003,7 @@ impl Blockstore {
|
|||||||
handle_duplicate: &F,
|
handle_duplicate: &F,
|
||||||
leader_schedule: Option<&Arc<LeaderScheduleCache>>,
|
leader_schedule: Option<&Arc<LeaderScheduleCache>>,
|
||||||
is_recovered: bool,
|
is_recovered: bool,
|
||||||
) -> bool
|
) -> std::result::Result<(), InsertDataShredError>
|
||||||
where
|
where
|
||||||
F: Fn(Shred),
|
F: Fn(Shred),
|
||||||
{
|
{
|
||||||
@ -960,7 +1022,7 @@ impl Blockstore {
|
|||||||
if !is_trusted {
|
if !is_trusted {
|
||||||
if Self::is_data_shred_present(&shred, slot_meta, &index_meta.data()) {
|
if Self::is_data_shred_present(&shred, slot_meta, &index_meta.data()) {
|
||||||
handle_duplicate(shred);
|
handle_duplicate(shred);
|
||||||
return false;
|
return Err(InsertDataShredError::Exists);
|
||||||
} else if !Blockstore::should_insert_data_shred(
|
} else if !Blockstore::should_insert_data_shred(
|
||||||
&shred,
|
&shred,
|
||||||
slot_meta,
|
slot_meta,
|
||||||
@ -968,30 +1030,25 @@ impl Blockstore {
|
|||||||
leader_schedule,
|
leader_schedule,
|
||||||
is_recovered,
|
is_recovered,
|
||||||
) {
|
) {
|
||||||
return false;
|
return Err(InsertDataShredError::InvalidShred);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let set_index = u64::from(shred.common_header.fec_set_index);
|
let set_index = u64::from(shred.common_header.fec_set_index);
|
||||||
if let Ok(()) =
|
self.insert_data_shred(slot_meta, index_meta.data_mut(), &shred, write_batch)?;
|
||||||
self.insert_data_shred(slot_meta, index_meta.data_mut(), &shred, write_batch)
|
just_inserted_data_shreds.insert((slot, shred_index), shred);
|
||||||
{
|
index_meta_working_set_entry.did_insert_occur = true;
|
||||||
just_inserted_data_shreds.insert((slot, shred_index), shred);
|
slot_meta_entry.did_insert_occur = true;
|
||||||
index_meta_working_set_entry.did_insert_occur = true;
|
if !erasure_metas.contains_key(&(slot, set_index)) {
|
||||||
slot_meta_entry.did_insert_occur = true;
|
if let Some(meta) = self
|
||||||
if !erasure_metas.contains_key(&(slot, set_index)) {
|
.erasure_meta_cf
|
||||||
if let Some(meta) = self
|
.get((slot, set_index))
|
||||||
.erasure_meta_cf
|
.expect("Expect database get to succeed")
|
||||||
.get((slot, set_index))
|
{
|
||||||
.expect("Expect database get to succeed")
|
erasure_metas.insert((slot, set_index), meta);
|
||||||
{
|
|
||||||
erasure_metas.insert((slot, set_index), meta);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
true
|
|
||||||
} else {
|
|
||||||
false
|
|
||||||
}
|
}
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn should_insert_coding_shred(
|
fn should_insert_coding_shred(
|
||||||
|
Reference in New Issue
Block a user