* adds ErasureSetId identifying erasure coding sets of shreds (#21928)
(cherry picked from commit 8183f28636
)
# Conflicts:
# ledger/src/blockstore.rs
* removes backport merge conflicts
Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
@ -12,7 +12,10 @@ use {
|
||||
blockstore_meta::*,
|
||||
leader_schedule_cache::LeaderScheduleCache,
|
||||
next_slots_iterator::NextSlotsIterator,
|
||||
shred::{Result as ShredResult, Shred, ShredId, ShredType, Shredder, SHRED_PAYLOAD_SIZE},
|
||||
shred::{
|
||||
ErasureSetId, Result as ShredResult, Shred, ShredId, ShredType, Shredder,
|
||||
SHRED_PAYLOAD_SIZE,
|
||||
},
|
||||
},
|
||||
bincode::deserialize,
|
||||
log::*,
|
||||
@ -548,8 +551,8 @@ impl Blockstore {
|
||||
false
|
||||
}
|
||||
|
||||
pub fn erasure_meta(&self, slot: Slot, set_index: u64) -> Result<Option<ErasureMeta>> {
|
||||
self.erasure_meta_cf.get((slot, set_index))
|
||||
fn erasure_meta(&self, erasure_set: ErasureSetId) -> Result<Option<ErasureMeta>> {
|
||||
self.erasure_meta_cf.get(erasure_set.store_key())
|
||||
}
|
||||
|
||||
pub fn orphan(&self, slot: Slot) -> Result<Option<bool>> {
|
||||
@ -728,7 +731,7 @@ impl Blockstore {
|
||||
|
||||
fn try_shred_recovery(
|
||||
db: &Database,
|
||||
erasure_metas: &HashMap<(Slot, /*fec set index:*/ u64), ErasureMeta>,
|
||||
erasure_metas: &HashMap<ErasureSetId, ErasureMeta>,
|
||||
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
|
||||
prev_inserted_shreds: &HashMap<ShredId, Shred>,
|
||||
) -> Vec<Shred> {
|
||||
@ -740,7 +743,8 @@ impl Blockstore {
|
||||
// 2. For new data shreds, check if an erasure set exists. If not, don't try recovery
|
||||
// 3. Before trying recovery, check if enough number of shreds have been received
|
||||
// 3a. Enough number of shreds = (#data + #coding shreds) > erasure.num_data
|
||||
for (&(slot, _fec_set_index), erasure_meta) in erasure_metas.iter() {
|
||||
for (erasure_set, erasure_meta) in erasure_metas.iter() {
|
||||
let slot = erasure_set.slot();
|
||||
let index_meta_entry = index_working_set.get_mut(&slot).expect("Index");
|
||||
let index = &mut index_meta_entry.index;
|
||||
match erasure_meta.status(index) {
|
||||
@ -937,8 +941,8 @@ impl Blockstore {
|
||||
&mut write_batch,
|
||||
)?;
|
||||
|
||||
for ((slot, set_index), erasure_meta) in erasure_metas {
|
||||
write_batch.put::<cf::ErasureMeta>((slot, set_index), &erasure_meta)?;
|
||||
for (erasure_set, erasure_meta) in erasure_metas {
|
||||
write_batch.put::<cf::ErasureMeta>(erasure_set.store_key(), &erasure_meta)?;
|
||||
}
|
||||
|
||||
for (&slot, index_working_set_entry) in index_working_set.iter() {
|
||||
@ -1022,7 +1026,7 @@ impl Blockstore {
|
||||
fn check_insert_coding_shred<F>(
|
||||
&self,
|
||||
shred: Shred,
|
||||
erasure_metas: &mut HashMap<(Slot, /*fec set index:*/ u64), ErasureMeta>,
|
||||
erasure_metas: &mut HashMap<ErasureSetId, ErasureMeta>,
|
||||
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
|
||||
write_batch: &mut WriteBatch,
|
||||
just_received_shreds: &mut HashMap<ShredId, Shred>,
|
||||
@ -1059,9 +1063,9 @@ impl Blockstore {
|
||||
}
|
||||
}
|
||||
|
||||
let set_index = u64::from(shred.fec_set_index());
|
||||
let erasure_meta = erasure_metas.entry((slot, set_index)).or_insert_with(|| {
|
||||
self.erasure_meta(slot, set_index)
|
||||
let erasure_set = shred.erasure_set();
|
||||
let erasure_meta = erasure_metas.entry(erasure_set).or_insert_with(|| {
|
||||
self.erasure_meta(erasure_set)
|
||||
.expect("Expect database get to succeed")
|
||||
.unwrap_or_else(|| ErasureMeta::from_coding_shred(&shred).unwrap())
|
||||
});
|
||||
@ -1090,8 +1094,8 @@ impl Blockstore {
|
||||
// ToDo: This is a potential slashing condition
|
||||
warn!("Received multiple erasure configs for the same erasure set!!!");
|
||||
warn!(
|
||||
"Slot: {}, shred index: {}, set_index: {}, is_duplicate: {}, stored config: {:#?}, new config: {:#?}",
|
||||
slot, shred.index(), set_index, self.has_duplicate_shreds_in_slot(slot), erasure_meta.config(), shred.coding_header,
|
||||
"Slot: {}, shred index: {}, erasure_set: {:?}, is_duplicate: {}, stored config: {:#?}, new config: {:#?}",
|
||||
slot, shred.index(), erasure_set, self.has_duplicate_shreds_in_slot(slot), erasure_meta.config(), shred.coding_header,
|
||||
);
|
||||
|
||||
return false;
|
||||
@ -1153,7 +1157,7 @@ impl Blockstore {
|
||||
fn check_insert_data_shred<F>(
|
||||
&self,
|
||||
shred: Shred,
|
||||
erasure_metas: &mut HashMap<(Slot, /*fec set index:*/ u64), ErasureMeta>,
|
||||
erasure_metas: &mut HashMap<ErasureSetId, ErasureMeta>,
|
||||
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
|
||||
slot_meta_working_set: &mut HashMap<u64, SlotMetaWorkingSetEntry>,
|
||||
write_batch: &mut WriteBatch,
|
||||
@ -1217,7 +1221,7 @@ impl Blockstore {
|
||||
}
|
||||
}
|
||||
|
||||
let set_index = u64::from(shred.fec_set_index());
|
||||
let erasure_set = shred.erasure_set();
|
||||
let newly_completed_data_sets = self.insert_data_shred(
|
||||
slot_meta,
|
||||
index_meta.data_mut(),
|
||||
@ -1228,8 +1232,8 @@ impl Blockstore {
|
||||
just_inserted_shreds.insert(shred.id(), shred);
|
||||
index_meta_working_set_entry.did_insert_occur = true;
|
||||
slot_meta_entry.did_insert_occur = true;
|
||||
if let HashMapEntry::Vacant(entry) = erasure_metas.entry((slot, set_index)) {
|
||||
if let Some(meta) = self.erasure_meta(slot, set_index).unwrap() {
|
||||
if let HashMapEntry::Vacant(entry) = erasure_metas.entry(erasure_set) {
|
||||
if let Some(meta) = self.erasure_meta(erasure_set).unwrap() {
|
||||
entry.insert(meta);
|
||||
}
|
||||
}
|
||||
|
@ -236,7 +236,7 @@ pub struct Shred {
|
||||
pub payload: Vec<u8>,
|
||||
}
|
||||
|
||||
/// Tuple which should uniquely identify a shred if it exists.
|
||||
/// Tuple which uniquely identifies a shred should it exists.
|
||||
#[derive(Clone, Copy, Eq, Hash, PartialEq)]
|
||||
pub struct ShredId(Slot, /*shred index:*/ u32, ShredType);
|
||||
|
||||
@ -250,6 +250,21 @@ impl ShredId {
|
||||
}
|
||||
}
|
||||
|
||||
/// Tuple which identifies erasure coding set that the shred belongs to.
|
||||
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
|
||||
pub(crate) struct ErasureSetId(Slot, /*fec_set_index:*/ u32);
|
||||
|
||||
impl ErasureSetId {
|
||||
pub(crate) fn slot(&self) -> Slot {
|
||||
self.0
|
||||
}
|
||||
|
||||
// Storage key for ErasureMeta in blockstore db.
|
||||
pub(crate) fn store_key(&self) -> (Slot, /*fec_set_index:*/ u64) {
|
||||
(self.0, u64::from(self.1))
|
||||
}
|
||||
}
|
||||
|
||||
impl Shred {
|
||||
fn deserialize_obj<'de, T>(index: &mut usize, size: usize, buf: &'de [u8]) -> bincode::Result<T>
|
||||
where
|
||||
@ -518,6 +533,11 @@ impl Shred {
|
||||
self.common_header.version
|
||||
}
|
||||
|
||||
// Identifier for the erasure coding set that the shred belongs to.
|
||||
pub(crate) fn erasure_set(&self) -> ErasureSetId {
|
||||
ErasureSetId(self.slot(), self.fec_set_index())
|
||||
}
|
||||
|
||||
// Returns the block index within the erasure coding set.
|
||||
fn erasure_block_index(&self) -> Option<usize> {
|
||||
let index = self.index().checked_sub(self.fec_set_index())?;
|
||||
|
Reference in New Issue
Block a user