* adds ShredId uniquely identifying each shred (#21820) (cherry picked from commit 4ceb2689f5337706ae42c27a54794bce9ae29443) # Conflicts: # ledger/src/blockstore.rs * removes backport merge conflicts Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
parent
2cd2f3ba7b
commit
2f1816d1db
@ -24,7 +24,7 @@ use {
|
|||||||
solana_ledger::{
|
solana_ledger::{
|
||||||
blockstore::Blockstore,
|
blockstore::Blockstore,
|
||||||
leader_schedule_cache::LeaderScheduleCache,
|
leader_schedule_cache::LeaderScheduleCache,
|
||||||
shred::{Shred, ShredType},
|
shred::{Shred, ShredId},
|
||||||
},
|
},
|
||||||
solana_measure::measure::Measure,
|
solana_measure::measure::Measure,
|
||||||
solana_perf::packet::PacketBatch,
|
solana_perf::packet::PacketBatch,
|
||||||
@ -145,13 +145,13 @@ impl RetransmitStats {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Map of shred (slot, index, type) => list of hash values seen for that key.
|
// Map of shred (slot, index, type) => list of hash values seen for that key.
|
||||||
type ShredFilter = LruCache<(Slot, u32, ShredType), Vec<u64>>;
|
type ShredFilter = LruCache<ShredId, Vec<u64>>;
|
||||||
|
|
||||||
type ShredFilterAndHasher = (ShredFilter, PacketHasher);
|
type ShredFilterAndHasher = (ShredFilter, PacketHasher);
|
||||||
|
|
||||||
// Returns true if shred is already received and should skip retransmit.
|
// Returns true if shred is already received and should skip retransmit.
|
||||||
fn should_skip_retransmit(shred: &Shred, shreds_received: &Mutex<ShredFilterAndHasher>) -> bool {
|
fn should_skip_retransmit(shred: &Shred, shreds_received: &Mutex<ShredFilterAndHasher>) -> bool {
|
||||||
let key = (shred.slot(), shred.index(), shred.shred_type());
|
let key = shred.id();
|
||||||
let mut shreds_received = shreds_received.lock().unwrap();
|
let mut shreds_received = shreds_received.lock().unwrap();
|
||||||
let (cache, hasher) = shreds_received.deref_mut();
|
let (cache, hasher) = shreds_received.deref_mut();
|
||||||
match cache.get_mut(&key) {
|
match cache.get_mut(&key) {
|
||||||
|
@ -217,12 +217,9 @@ fn run_check_duplicate(
|
|||||||
let check_duplicate = |shred: Shred| -> Result<()> {
|
let check_duplicate = |shred: Shred| -> Result<()> {
|
||||||
let shred_slot = shred.slot();
|
let shred_slot = shred.slot();
|
||||||
if !blockstore.has_duplicate_shreds_in_slot(shred_slot) {
|
if !blockstore.has_duplicate_shreds_in_slot(shred_slot) {
|
||||||
if let Some(existing_shred_payload) = blockstore.is_shred_duplicate(
|
if let Some(existing_shred_payload) =
|
||||||
shred_slot,
|
blockstore.is_shred_duplicate(shred.id(), shred.payload.clone())
|
||||||
shred.index(),
|
{
|
||||||
shred.payload.clone(),
|
|
||||||
shred.shred_type(),
|
|
||||||
) {
|
|
||||||
cluster_info.push_duplicate_shred(&shred, &existing_shred_payload)?;
|
cluster_info.push_duplicate_shred(&shred, &existing_shred_payload)?;
|
||||||
blockstore.store_duplicate_slot(
|
blockstore.store_duplicate_slot(
|
||||||
shred_slot,
|
shred_slot,
|
||||||
|
@ -12,7 +12,7 @@ use {
|
|||||||
blockstore_meta::*,
|
blockstore_meta::*,
|
||||||
leader_schedule_cache::LeaderScheduleCache,
|
leader_schedule_cache::LeaderScheduleCache,
|
||||||
next_slots_iterator::NextSlotsIterator,
|
next_slots_iterator::NextSlotsIterator,
|
||||||
shred::{Result as ShredResult, Shred, ShredType, Shredder, SHRED_PAYLOAD_SIZE},
|
shred::{Result as ShredResult, Shred, ShredId, ShredType, Shredder, SHRED_PAYLOAD_SIZE},
|
||||||
},
|
},
|
||||||
bincode::deserialize,
|
bincode::deserialize,
|
||||||
log::*,
|
log::*,
|
||||||
@ -626,12 +626,13 @@ impl Blockstore {
|
|||||||
index: &'a Index,
|
index: &'a Index,
|
||||||
slot: Slot,
|
slot: Slot,
|
||||||
erasure_meta: &'a ErasureMeta,
|
erasure_meta: &'a ErasureMeta,
|
||||||
prev_inserted_datas: &'a mut HashMap<(Slot, /*shred index:*/ u64), Shred>,
|
prev_inserted_shreds: &'a HashMap<ShredId, Shred>,
|
||||||
data_cf: &'a LedgerColumn<cf::ShredData>,
|
data_cf: &'a LedgerColumn<cf::ShredData>,
|
||||||
) -> impl Iterator<Item = Shred> + 'a {
|
) -> impl Iterator<Item = Shred> + 'a {
|
||||||
erasure_meta.data_shreds_indices().filter_map(move |i| {
|
erasure_meta.data_shreds_indices().filter_map(move |i| {
|
||||||
if let Some(shred) = prev_inserted_datas.remove(&(slot, i)) {
|
let key = ShredId::new(slot, u32::try_from(i).unwrap(), ShredType::Data);
|
||||||
return Some(shred);
|
if let Some(shred) = prev_inserted_shreds.get(&key) {
|
||||||
|
return Some(shred.clone());
|
||||||
}
|
}
|
||||||
if !index.data().is_present(i) {
|
if !index.data().is_present(i) {
|
||||||
return None;
|
return None;
|
||||||
@ -647,14 +648,15 @@ impl Blockstore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn get_recovery_coding_shreds<'a>(
|
fn get_recovery_coding_shreds<'a>(
|
||||||
index: &'a mut Index,
|
index: &'a Index,
|
||||||
slot: Slot,
|
slot: Slot,
|
||||||
erasure_meta: &'a ErasureMeta,
|
erasure_meta: &'a ErasureMeta,
|
||||||
prev_inserted_codes: &'a HashMap<(Slot, /*shred index:*/ u64), Shred>,
|
prev_inserted_shreds: &'a HashMap<ShredId, Shred>,
|
||||||
code_cf: &'a LedgerColumn<cf::ShredCode>,
|
code_cf: &'a LedgerColumn<cf::ShredCode>,
|
||||||
) -> impl Iterator<Item = Shred> + 'a {
|
) -> impl Iterator<Item = Shred> + 'a {
|
||||||
erasure_meta.coding_shreds_indices().filter_map(move |i| {
|
erasure_meta.coding_shreds_indices().filter_map(move |i| {
|
||||||
if let Some(shred) = prev_inserted_codes.get(&(slot, i)) {
|
let key = ShredId::new(slot, u32::try_from(i).unwrap(), ShredType::Code);
|
||||||
|
if let Some(shred) = prev_inserted_shreds.get(&key) {
|
||||||
return Some(shred.clone());
|
return Some(shred.clone());
|
||||||
}
|
}
|
||||||
if !index.coding().is_present(i) {
|
if !index.coding().is_present(i) {
|
||||||
@ -673,24 +675,28 @@ impl Blockstore {
|
|||||||
fn recover_shreds(
|
fn recover_shreds(
|
||||||
index: &mut Index,
|
index: &mut Index,
|
||||||
erasure_meta: &ErasureMeta,
|
erasure_meta: &ErasureMeta,
|
||||||
prev_inserted_datas: &mut HashMap<(Slot, /*shred index:*/ u64), Shred>,
|
prev_inserted_shreds: &HashMap<ShredId, Shred>,
|
||||||
prev_inserted_codes: &HashMap<(Slot, /*shred index:*/ u64), Shred>,
|
|
||||||
recovered_data_shreds: &mut Vec<Shred>,
|
recovered_data_shreds: &mut Vec<Shred>,
|
||||||
data_cf: &LedgerColumn<cf::ShredData>,
|
data_cf: &LedgerColumn<cf::ShredData>,
|
||||||
code_cf: &LedgerColumn<cf::ShredCode>,
|
code_cf: &LedgerColumn<cf::ShredCode>,
|
||||||
) {
|
) {
|
||||||
// Find shreds for this erasure set and try recovery
|
// Find shreds for this erasure set and try recovery
|
||||||
let slot = index.slot;
|
let slot = index.slot;
|
||||||
let mut available_shreds: Vec<_> =
|
let available_shreds: Vec<_> = Self::get_recovery_data_shreds(
|
||||||
Self::get_recovery_data_shreds(index, slot, erasure_meta, prev_inserted_datas, data_cf)
|
|
||||||
.collect();
|
|
||||||
available_shreds.extend(Self::get_recovery_coding_shreds(
|
|
||||||
index,
|
index,
|
||||||
slot,
|
slot,
|
||||||
erasure_meta,
|
erasure_meta,
|
||||||
prev_inserted_codes,
|
prev_inserted_shreds,
|
||||||
|
data_cf,
|
||||||
|
)
|
||||||
|
.chain(Self::get_recovery_coding_shreds(
|
||||||
|
index,
|
||||||
|
slot,
|
||||||
|
erasure_meta,
|
||||||
|
prev_inserted_shreds,
|
||||||
code_cf,
|
code_cf,
|
||||||
));
|
))
|
||||||
|
.collect();
|
||||||
if let Ok(mut result) = Shredder::try_recovery(available_shreds) {
|
if let Ok(mut result) = Shredder::try_recovery(available_shreds) {
|
||||||
Self::submit_metrics(slot, erasure_meta, true, "complete".into(), result.len());
|
Self::submit_metrics(slot, erasure_meta, true, "complete".into(), result.len());
|
||||||
recovered_data_shreds.append(&mut result);
|
recovered_data_shreds.append(&mut result);
|
||||||
@ -724,8 +730,7 @@ impl Blockstore {
|
|||||||
db: &Database,
|
db: &Database,
|
||||||
erasure_metas: &HashMap<(Slot, /*fec set index:*/ u64), ErasureMeta>,
|
erasure_metas: &HashMap<(Slot, /*fec set index:*/ u64), ErasureMeta>,
|
||||||
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
|
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
|
||||||
prev_inserted_datas: &mut HashMap<(Slot, /*shred index:*/ u64), Shred>,
|
prev_inserted_shreds: &HashMap<ShredId, Shred>,
|
||||||
prev_inserted_codes: &HashMap<(Slot, /*shred index:*/ u64), Shred>,
|
|
||||||
) -> Vec<Shred> {
|
) -> Vec<Shred> {
|
||||||
let data_cf = db.column::<cf::ShredData>();
|
let data_cf = db.column::<cf::ShredData>();
|
||||||
let code_cf = db.column::<cf::ShredCode>();
|
let code_cf = db.column::<cf::ShredCode>();
|
||||||
@ -743,8 +748,7 @@ impl Blockstore {
|
|||||||
Self::recover_shreds(
|
Self::recover_shreds(
|
||||||
index,
|
index,
|
||||||
erasure_meta,
|
erasure_meta,
|
||||||
prev_inserted_datas,
|
prev_inserted_shreds,
|
||||||
prev_inserted_codes,
|
|
||||||
&mut recovered_data_shreds,
|
&mut recovered_data_shreds,
|
||||||
&data_cf,
|
&data_cf,
|
||||||
&code_cf,
|
&code_cf,
|
||||||
@ -790,8 +794,7 @@ impl Blockstore {
|
|||||||
let db = &*self.db;
|
let db = &*self.db;
|
||||||
let mut write_batch = db.batch()?;
|
let mut write_batch = db.batch()?;
|
||||||
|
|
||||||
let mut just_inserted_coding_shreds = HashMap::new();
|
let mut just_inserted_shreds = HashMap::with_capacity(shreds.len());
|
||||||
let mut just_inserted_data_shreds = HashMap::new();
|
|
||||||
let mut erasure_metas = HashMap::new();
|
let mut erasure_metas = HashMap::new();
|
||||||
let mut slot_meta_working_set = HashMap::new();
|
let mut slot_meta_working_set = HashMap::new();
|
||||||
let mut index_working_set = HashMap::new();
|
let mut index_working_set = HashMap::new();
|
||||||
@ -815,7 +818,7 @@ impl Blockstore {
|
|||||||
&mut index_working_set,
|
&mut index_working_set,
|
||||||
&mut slot_meta_working_set,
|
&mut slot_meta_working_set,
|
||||||
&mut write_batch,
|
&mut write_batch,
|
||||||
&mut just_inserted_data_shreds,
|
&mut just_inserted_shreds,
|
||||||
&mut index_meta_time,
|
&mut index_meta_time,
|
||||||
is_trusted,
|
is_trusted,
|
||||||
handle_duplicate,
|
handle_duplicate,
|
||||||
@ -843,7 +846,7 @@ impl Blockstore {
|
|||||||
&mut erasure_metas,
|
&mut erasure_metas,
|
||||||
&mut index_working_set,
|
&mut index_working_set,
|
||||||
&mut write_batch,
|
&mut write_batch,
|
||||||
&mut just_inserted_coding_shreds,
|
&mut just_inserted_shreds,
|
||||||
&mut index_meta_time,
|
&mut index_meta_time,
|
||||||
handle_duplicate,
|
handle_duplicate,
|
||||||
is_trusted,
|
is_trusted,
|
||||||
@ -862,8 +865,7 @@ impl Blockstore {
|
|||||||
db,
|
db,
|
||||||
&erasure_metas,
|
&erasure_metas,
|
||||||
&mut index_working_set,
|
&mut index_working_set,
|
||||||
&mut just_inserted_data_shreds,
|
&just_inserted_shreds,
|
||||||
&just_inserted_coding_shreds,
|
|
||||||
);
|
);
|
||||||
|
|
||||||
metrics.num_recovered += recovered_data_shreds.len();
|
metrics.num_recovered += recovered_data_shreds.len();
|
||||||
@ -882,7 +884,7 @@ impl Blockstore {
|
|||||||
&mut index_working_set,
|
&mut index_working_set,
|
||||||
&mut slot_meta_working_set,
|
&mut slot_meta_working_set,
|
||||||
&mut write_batch,
|
&mut write_batch,
|
||||||
&mut just_inserted_data_shreds,
|
&mut just_inserted_shreds,
|
||||||
&mut index_meta_time,
|
&mut index_meta_time,
|
||||||
is_trusted,
|
is_trusted,
|
||||||
&handle_duplicate,
|
&handle_duplicate,
|
||||||
@ -1010,6 +1012,8 @@ impl Blockstore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn erasure_mismatch(shred1: &Shred, shred2: &Shred) -> bool {
|
fn erasure_mismatch(shred1: &Shred, shred2: &Shred) -> bool {
|
||||||
|
// TODO should also compare first-coding-index once position field is
|
||||||
|
// populated across cluster.
|
||||||
shred1.coding_header.num_coding_shreds != shred2.coding_header.num_coding_shreds
|
shred1.coding_header.num_coding_shreds != shred2.coding_header.num_coding_shreds
|
||||||
|| shred1.coding_header.num_data_shreds != shred2.coding_header.num_data_shreds
|
|| shred1.coding_header.num_data_shreds != shred2.coding_header.num_data_shreds
|
||||||
}
|
}
|
||||||
@ -1021,7 +1025,7 @@ impl Blockstore {
|
|||||||
erasure_metas: &mut HashMap<(Slot, /*fec set index:*/ u64), ErasureMeta>,
|
erasure_metas: &mut HashMap<(Slot, /*fec set index:*/ u64), ErasureMeta>,
|
||||||
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
|
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
|
||||||
write_batch: &mut WriteBatch,
|
write_batch: &mut WriteBatch,
|
||||||
just_received_coding_shreds: &mut HashMap<(Slot, /*shred index:*/ u64), Shred>,
|
just_received_shreds: &mut HashMap<ShredId, Shred>,
|
||||||
index_meta_time: &mut u64,
|
index_meta_time: &mut u64,
|
||||||
handle_duplicate: &F,
|
handle_duplicate: &F,
|
||||||
is_trusted: bool,
|
is_trusted: bool,
|
||||||
@ -1070,7 +1074,7 @@ impl Blockstore {
|
|||||||
&shred,
|
&shred,
|
||||||
slot,
|
slot,
|
||||||
erasure_meta,
|
erasure_meta,
|
||||||
just_received_coding_shreds,
|
just_received_shreds,
|
||||||
);
|
);
|
||||||
if let Some(conflicting_shred) = conflicting_shred {
|
if let Some(conflicting_shred) = conflicting_shred {
|
||||||
if self
|
if self
|
||||||
@ -1109,8 +1113,7 @@ impl Blockstore {
|
|||||||
metrics.num_inserted += 1;
|
metrics.num_inserted += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if let HashMapEntry::Vacant(entry) = just_received_coding_shreds.entry((slot, shred_index))
|
if let HashMapEntry::Vacant(entry) = just_received_shreds.entry(shred.id()) {
|
||||||
{
|
|
||||||
metrics.num_coding_shreds_inserted += 1;
|
metrics.num_coding_shreds_inserted += 1;
|
||||||
entry.insert(shred);
|
entry.insert(shred);
|
||||||
}
|
}
|
||||||
@ -1123,30 +1126,27 @@ impl Blockstore {
|
|||||||
shred: &Shred,
|
shred: &Shred,
|
||||||
slot: Slot,
|
slot: Slot,
|
||||||
erasure_meta: &ErasureMeta,
|
erasure_meta: &ErasureMeta,
|
||||||
just_received_coding_shreds: &mut HashMap<(Slot, /*shred index:*/ u64), Shred>,
|
just_received_shreds: &HashMap<ShredId, Shred>,
|
||||||
) -> Option<Vec<u8>> {
|
) -> Option<Vec<u8>> {
|
||||||
// Search for the shred which set the initial erasure config, either inserted,
|
// Search for the shred which set the initial erasure config, either inserted,
|
||||||
// or in the current batch in just_received_coding_shreds.
|
// or in the current batch in just_received_shreds.
|
||||||
let mut conflicting_shred = None;
|
|
||||||
for coding_index in erasure_meta.coding_shreds_indices() {
|
for coding_index in erasure_meta.coding_shreds_indices() {
|
||||||
let maybe_shred = self.get_coding_shred(slot, coding_index);
|
let maybe_shred = self.get_coding_shred(slot, coding_index);
|
||||||
if let Ok(Some(shred_data)) = maybe_shred {
|
if let Ok(Some(shred_data)) = maybe_shred {
|
||||||
let potential_shred = Shred::new_from_serialized_shred(shred_data).unwrap();
|
let potential_shred = Shred::new_from_serialized_shred(shred_data).unwrap();
|
||||||
if Self::erasure_mismatch(&potential_shred, shred) {
|
if Self::erasure_mismatch(&potential_shred, shred) {
|
||||||
conflicting_shred = Some(potential_shred.payload);
|
return Some(potential_shred.payload);
|
||||||
}
|
}
|
||||||
break;
|
} else if let Some(potential_shred) = {
|
||||||
} else if let Some(potential_shred) =
|
let key = ShredId::new(slot, u32::try_from(coding_index).unwrap(), ShredType::Code);
|
||||||
just_received_coding_shreds.get(&(slot, coding_index))
|
just_received_shreds.get(&key)
|
||||||
{
|
} {
|
||||||
if Self::erasure_mismatch(potential_shred, shred) {
|
if Self::erasure_mismatch(potential_shred, shred) {
|
||||||
conflicting_shred = Some(potential_shred.payload.clone());
|
return Some(potential_shred.payload.clone());
|
||||||
}
|
}
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
None
|
||||||
conflicting_shred
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
@ -1157,7 +1157,7 @@ impl Blockstore {
|
|||||||
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
|
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
|
||||||
slot_meta_working_set: &mut HashMap<u64, SlotMetaWorkingSetEntry>,
|
slot_meta_working_set: &mut HashMap<u64, SlotMetaWorkingSetEntry>,
|
||||||
write_batch: &mut WriteBatch,
|
write_batch: &mut WriteBatch,
|
||||||
just_inserted_data_shreds: &mut HashMap<(Slot, /*shred index:*/ u64), Shred>,
|
just_inserted_shreds: &mut HashMap<ShredId, Shred>,
|
||||||
index_meta_time: &mut u64,
|
index_meta_time: &mut u64,
|
||||||
is_trusted: bool,
|
is_trusted: bool,
|
||||||
handle_duplicate: &F,
|
handle_duplicate: &F,
|
||||||
@ -1208,7 +1208,7 @@ impl Blockstore {
|
|||||||
if !self.should_insert_data_shred(
|
if !self.should_insert_data_shred(
|
||||||
&shred,
|
&shred,
|
||||||
slot_meta,
|
slot_meta,
|
||||||
just_inserted_data_shreds,
|
just_inserted_shreds,
|
||||||
&self.last_root,
|
&self.last_root,
|
||||||
leader_schedule,
|
leader_schedule,
|
||||||
shred_source.clone(),
|
shred_source.clone(),
|
||||||
@ -1225,7 +1225,7 @@ impl Blockstore {
|
|||||||
write_batch,
|
write_batch,
|
||||||
shred_source,
|
shred_source,
|
||||||
)?;
|
)?;
|
||||||
just_inserted_data_shreds.insert((slot, shred_index), shred);
|
just_inserted_shreds.insert(shred.id(), shred);
|
||||||
index_meta_working_set_entry.did_insert_occur = true;
|
index_meta_working_set_entry.did_insert_occur = true;
|
||||||
slot_meta_entry.did_insert_occur = true;
|
slot_meta_entry.did_insert_occur = true;
|
||||||
if let HashMapEntry::Vacant(entry) = erasure_metas.entry((slot, set_index)) {
|
if let HashMapEntry::Vacant(entry) = erasure_metas.entry((slot, set_index)) {
|
||||||
@ -1269,11 +1269,12 @@ impl Blockstore {
|
|||||||
|
|
||||||
fn get_data_shred_from_just_inserted_or_db<'a>(
|
fn get_data_shred_from_just_inserted_or_db<'a>(
|
||||||
&'a self,
|
&'a self,
|
||||||
just_inserted_data_shreds: &'a HashMap<(Slot, /*shred index:*/ u64), Shred>,
|
just_inserted_shreds: &'a HashMap<ShredId, Shred>,
|
||||||
slot: Slot,
|
slot: Slot,
|
||||||
index: u64,
|
index: u64,
|
||||||
) -> Cow<'a, Vec<u8>> {
|
) -> Cow<'a, Vec<u8>> {
|
||||||
if let Some(shred) = just_inserted_data_shreds.get(&(slot, index)) {
|
let key = ShredId::new(slot, u32::try_from(index).unwrap(), ShredType::Data);
|
||||||
|
if let Some(shred) = just_inserted_shreds.get(&key) {
|
||||||
Cow::Borrowed(&shred.payload)
|
Cow::Borrowed(&shred.payload)
|
||||||
} else {
|
} else {
|
||||||
// If it doesn't exist in the just inserted set, it must exist in
|
// If it doesn't exist in the just inserted set, it must exist in
|
||||||
@ -1286,7 +1287,7 @@ impl Blockstore {
|
|||||||
&self,
|
&self,
|
||||||
shred: &Shred,
|
shred: &Shred,
|
||||||
slot_meta: &SlotMeta,
|
slot_meta: &SlotMeta,
|
||||||
just_inserted_data_shreds: &HashMap<(Slot, /*shred index:*/ u64), Shred>,
|
just_inserted_shreds: &HashMap<ShredId, Shred>,
|
||||||
last_root: &RwLock<u64>,
|
last_root: &RwLock<u64>,
|
||||||
leader_schedule: Option<&LeaderScheduleCache>,
|
leader_schedule: Option<&LeaderScheduleCache>,
|
||||||
shred_source: ShredSource,
|
shred_source: ShredSource,
|
||||||
@ -1343,7 +1344,7 @@ impl Blockstore {
|
|||||||
.and_then(|leader_schedule| leader_schedule.slot_leader_at(slot, None));
|
.and_then(|leader_schedule| leader_schedule.slot_leader_at(slot, None));
|
||||||
|
|
||||||
let ending_shred: Cow<Vec<u8>> = self.get_data_shred_from_just_inserted_or_db(
|
let ending_shred: Cow<Vec<u8>> = self.get_data_shred_from_just_inserted_or_db(
|
||||||
just_inserted_data_shreds,
|
just_inserted_shreds,
|
||||||
slot,
|
slot,
|
||||||
last_index.unwrap(),
|
last_index.unwrap(),
|
||||||
);
|
);
|
||||||
@ -1379,7 +1380,7 @@ impl Blockstore {
|
|||||||
.and_then(|leader_schedule| leader_schedule.slot_leader_at(slot, None));
|
.and_then(|leader_schedule| leader_schedule.slot_leader_at(slot, None));
|
||||||
|
|
||||||
let ending_shred: Cow<Vec<u8>> = self.get_data_shred_from_just_inserted_or_db(
|
let ending_shred: Cow<Vec<u8>> = self.get_data_shred_from_just_inserted_or_db(
|
||||||
just_inserted_data_shreds,
|
just_inserted_shreds,
|
||||||
slot,
|
slot,
|
||||||
slot_meta.received - 1,
|
slot_meta.received - 1,
|
||||||
);
|
);
|
||||||
@ -3000,13 +3001,8 @@ impl Blockstore {
|
|||||||
// Returns the existing shred if `new_shred` is not equal to the existing shred at the
|
// Returns the existing shred if `new_shred` is not equal to the existing shred at the
|
||||||
// given slot and index as this implies the leader generated two different shreds with
|
// given slot and index as this implies the leader generated two different shreds with
|
||||||
// the same slot and index
|
// the same slot and index
|
||||||
pub fn is_shred_duplicate(
|
pub fn is_shred_duplicate(&self, shred: ShredId, mut payload: Vec<u8>) -> Option<Vec<u8>> {
|
||||||
&self,
|
let (slot, index, shred_type) = shred.unwrap();
|
||||||
slot: u64,
|
|
||||||
index: u32,
|
|
||||||
mut payload: Vec<u8>,
|
|
||||||
shred_type: ShredType,
|
|
||||||
) -> Option<Vec<u8>> {
|
|
||||||
let existing_shred = match shred_type {
|
let existing_shred = match shred_type {
|
||||||
ShredType::Data => self.get_data_shred(slot, index as u64),
|
ShredType::Data => self.get_data_shred(slot, index as u64),
|
||||||
ShredType::Code => self.get_coding_shred(slot, index as u64),
|
ShredType::Code => self.get_coding_shred(slot, index as u64),
|
||||||
@ -5549,7 +5545,7 @@ pub mod tests {
|
|||||||
|
|
||||||
let mut erasure_metas = HashMap::new();
|
let mut erasure_metas = HashMap::new();
|
||||||
let mut index_working_set = HashMap::new();
|
let mut index_working_set = HashMap::new();
|
||||||
let mut just_received_coding_shreds = HashMap::new();
|
let mut just_received_shreds = HashMap::new();
|
||||||
let mut write_batch = blockstore.db.batch().unwrap();
|
let mut write_batch = blockstore.db.batch().unwrap();
|
||||||
let mut index_meta_time = 0;
|
let mut index_meta_time = 0;
|
||||||
assert!(blockstore.check_insert_coding_shred(
|
assert!(blockstore.check_insert_coding_shred(
|
||||||
@ -5557,7 +5553,7 @@ pub mod tests {
|
|||||||
&mut erasure_metas,
|
&mut erasure_metas,
|
||||||
&mut index_working_set,
|
&mut index_working_set,
|
||||||
&mut write_batch,
|
&mut write_batch,
|
||||||
&mut just_received_coding_shreds,
|
&mut just_received_shreds,
|
||||||
&mut index_meta_time,
|
&mut index_meta_time,
|
||||||
&|_shred| {
|
&|_shred| {
|
||||||
panic!("no dupes");
|
panic!("no dupes");
|
||||||
@ -5575,7 +5571,7 @@ pub mod tests {
|
|||||||
&mut erasure_metas,
|
&mut erasure_metas,
|
||||||
&mut index_working_set,
|
&mut index_working_set,
|
||||||
&mut write_batch,
|
&mut write_batch,
|
||||||
&mut just_received_coding_shreds,
|
&mut just_received_shreds,
|
||||||
&mut index_meta_time,
|
&mut index_meta_time,
|
||||||
&|_shred| {
|
&|_shred| {
|
||||||
counter.fetch_add(1, Ordering::Relaxed);
|
counter.fetch_add(1, Ordering::Relaxed);
|
||||||
@ -8081,19 +8077,15 @@ pub mod tests {
|
|||||||
// Check if shreds are duplicated
|
// Check if shreds are duplicated
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
blockstore.is_shred_duplicate(
|
blockstore.is_shred_duplicate(
|
||||||
slot,
|
ShredId::new(slot, /*index:*/ 0, duplicate_shred.shred_type()),
|
||||||
0,
|
|
||||||
duplicate_shred.payload.clone(),
|
duplicate_shred.payload.clone(),
|
||||||
duplicate_shred.shred_type(),
|
|
||||||
),
|
),
|
||||||
Some(shred.payload.to_vec())
|
Some(shred.payload.to_vec())
|
||||||
);
|
);
|
||||||
assert!(blockstore
|
assert!(blockstore
|
||||||
.is_shred_duplicate(
|
.is_shred_duplicate(
|
||||||
slot,
|
ShredId::new(slot, /*index:*/ 0, non_duplicate_shred.shred_type()),
|
||||||
0,
|
non_duplicate_shred.payload,
|
||||||
non_duplicate_shred.payload.clone(),
|
|
||||||
non_duplicate_shred.shred_type(),
|
|
||||||
)
|
)
|
||||||
.is_none());
|
.is_none());
|
||||||
|
|
||||||
@ -8561,10 +8553,12 @@ pub mod tests {
|
|||||||
std::u8::MAX - even_smaller_last_shred_duplicate.payload[0];
|
std::u8::MAX - even_smaller_last_shred_duplicate.payload[0];
|
||||||
assert!(blockstore
|
assert!(blockstore
|
||||||
.is_shred_duplicate(
|
.is_shred_duplicate(
|
||||||
slot,
|
ShredId::new(
|
||||||
even_smaller_last_shred_duplicate.index(),
|
slot,
|
||||||
|
even_smaller_last_shred_duplicate.index(),
|
||||||
|
ShredType::Data
|
||||||
|
),
|
||||||
even_smaller_last_shred_duplicate.payload.clone(),
|
even_smaller_last_shred_duplicate.payload.clone(),
|
||||||
ShredType::Data,
|
|
||||||
)
|
)
|
||||||
.is_some());
|
.is_some());
|
||||||
blockstore
|
blockstore
|
||||||
|
@ -236,6 +236,20 @@ pub struct Shred {
|
|||||||
pub payload: Vec<u8>,
|
pub payload: Vec<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Tuple which should uniquely identify a shred if it exists.
|
||||||
|
#[derive(Clone, Copy, Eq, Hash, PartialEq)]
|
||||||
|
pub struct ShredId(Slot, /*shred index:*/ u32, ShredType);
|
||||||
|
|
||||||
|
impl ShredId {
|
||||||
|
pub(crate) fn new(slot: Slot, index: u32, shred_type: ShredType) -> ShredId {
|
||||||
|
ShredId(slot, index, shred_type)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn unwrap(&self) -> (Slot, /*shred index:*/ u32, ShredType) {
|
||||||
|
(self.0, self.1, self.2)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Shred {
|
impl Shred {
|
||||||
fn deserialize_obj<'de, T>(index: &mut usize, size: usize, buf: &'de [u8]) -> bincode::Result<T>
|
fn deserialize_obj<'de, T>(index: &mut usize, size: usize, buf: &'de [u8]) -> bincode::Result<T>
|
||||||
where
|
where
|
||||||
@ -438,6 +452,11 @@ impl Shred {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Unique identifier for each shred.
|
||||||
|
pub fn id(&self) -> ShredId {
|
||||||
|
ShredId(self.slot(), self.index(), self.shred_type())
|
||||||
|
}
|
||||||
|
|
||||||
pub fn slot(&self) -> Slot {
|
pub fn slot(&self) -> Slot {
|
||||||
self.common_header.slot
|
self.common_header.slot
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user