The indices for erasure coding shreds are tied to data shreds:
https://github.com/solana-labs/solana/blob/90f41fd9b/ledger/src/shred.rs#L921
However with the upcoming changes to erasure schema, there will be more
erasure coding shreds than data shreds and we can no longer infer coding
shreds indices from data shreds.
The commit adds constructs to track coding shreds indices explicitly.
(cherry picked from commit 65d59f4ef0
)
Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
@ -496,6 +496,7 @@ pub mod test {
|
||||
&keypair,
|
||||
&data_shreds[0..],
|
||||
true, // is_last_in_slot
|
||||
0, // next_code_index
|
||||
&mut ProcessShredsStats::default(),
|
||||
)
|
||||
.unwrap();
|
||||
|
@ -28,6 +28,7 @@ pub(super) struct BroadcastDuplicatesRun {
|
||||
config: BroadcastDuplicatesConfig,
|
||||
current_slot: Slot,
|
||||
next_shred_index: u32,
|
||||
next_code_index: u32,
|
||||
shred_version: u16,
|
||||
recent_blockhash: Option<Hash>,
|
||||
prev_entry_hash: Option<Hash>,
|
||||
@ -46,6 +47,7 @@ impl BroadcastDuplicatesRun {
|
||||
Self {
|
||||
config,
|
||||
next_shred_index: u32::MAX,
|
||||
next_code_index: 0,
|
||||
shred_version,
|
||||
current_slot: 0,
|
||||
recent_blockhash: None,
|
||||
@ -74,6 +76,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
|
||||
|
||||
if bank.slot() != self.current_slot {
|
||||
self.next_shred_index = 0;
|
||||
self.next_code_index = 0;
|
||||
self.current_slot = bank.slot();
|
||||
self.prev_entry_hash = None;
|
||||
self.num_slots_broadcasted += 1;
|
||||
@ -154,22 +157,26 @@ impl BroadcastRun for BroadcastDuplicatesRun {
|
||||
)
|
||||
.expect("Expected to create a new shredder");
|
||||
|
||||
let (data_shreds, _) = shredder.entries_to_shreds(
|
||||
let (data_shreds, coding_shreds) = shredder.entries_to_shreds(
|
||||
keypair,
|
||||
&receive_results.entries,
|
||||
last_tick_height == bank.max_tick_height() && last_entries.is_none(),
|
||||
self.next_shred_index,
|
||||
self.next_code_index,
|
||||
);
|
||||
|
||||
self.next_shred_index += data_shreds.len() as u32;
|
||||
if let Some(index) = coding_shreds.iter().map(Shred::index).max() {
|
||||
self.next_code_index = index + 1;
|
||||
}
|
||||
let last_shreds = last_entries.map(|(original_last_entry, duplicate_extra_last_entries)| {
|
||||
let (original_last_data_shred, _) =
|
||||
shredder.entries_to_shreds(keypair, &[original_last_entry], true, self.next_shred_index);
|
||||
shredder.entries_to_shreds(keypair, &[original_last_entry], true, self.next_shred_index, self.next_code_index);
|
||||
|
||||
let (partition_last_data_shred, _) =
|
||||
// Don't mark the last shred as last so that validators won't know that
|
||||
// they've gotten all the shreds, and will continue trying to repair
|
||||
shredder.entries_to_shreds(keypair, &duplicate_extra_last_entries, true, self.next_shred_index);
|
||||
shredder.entries_to_shreds(keypair, &duplicate_extra_last_entries, true, self.next_shred_index, self.next_code_index);
|
||||
|
||||
let sigs: Vec<_> = partition_last_data_shred.iter().map(|s| (s.signature(), s.index())).collect();
|
||||
info!(
|
||||
|
@ -10,6 +10,7 @@ pub(super) struct BroadcastFakeShredsRun {
|
||||
last_blockhash: Hash,
|
||||
partition: usize,
|
||||
shred_version: u16,
|
||||
next_code_index: u32,
|
||||
}
|
||||
|
||||
impl BroadcastFakeShredsRun {
|
||||
@ -18,6 +19,7 @@ impl BroadcastFakeShredsRun {
|
||||
last_blockhash: Hash::default(),
|
||||
partition,
|
||||
shred_version,
|
||||
next_code_index: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -57,6 +59,7 @@ impl BroadcastRun for BroadcastFakeShredsRun {
|
||||
&receive_results.entries,
|
||||
last_tick_height == bank.max_tick_height(),
|
||||
next_shred_index,
|
||||
self.next_code_index,
|
||||
);
|
||||
|
||||
// If the last blockhash is default, a new block is being created
|
||||
@ -74,8 +77,18 @@ impl BroadcastRun for BroadcastFakeShredsRun {
|
||||
&fake_entries,
|
||||
last_tick_height == bank.max_tick_height(),
|
||||
next_shred_index,
|
||||
self.next_code_index,
|
||||
);
|
||||
|
||||
if let Some(index) = coding_shreds
|
||||
.iter()
|
||||
.chain(&fake_coding_shreds)
|
||||
.map(Shred::index)
|
||||
.max()
|
||||
{
|
||||
self.next_code_index = index + 1;
|
||||
}
|
||||
|
||||
// If it's the last tick, reset the last block hash to default
|
||||
// this will cause next run to grab last bank's blockhash
|
||||
if last_tick_height == bank.max_tick_height() {
|
||||
|
@ -21,6 +21,7 @@ pub(super) struct ReceiveResults {
|
||||
#[derive(Clone)]
|
||||
pub struct UnfinishedSlotInfo {
|
||||
pub next_shred_index: u32,
|
||||
pub(crate) next_code_index: u32,
|
||||
pub slot: Slot,
|
||||
pub parent: Slot,
|
||||
// Data shreds buffered to make a batch of size
|
||||
|
@ -15,6 +15,7 @@ pub(super) struct FailEntryVerificationBroadcastRun {
|
||||
good_shreds: Vec<Shred>,
|
||||
current_slot: Slot,
|
||||
next_shred_index: u32,
|
||||
next_code_index: u32,
|
||||
cluster_nodes_cache: Arc<ClusterNodesCache<BroadcastStage>>,
|
||||
}
|
||||
|
||||
@ -29,6 +30,7 @@ impl FailEntryVerificationBroadcastRun {
|
||||
good_shreds: vec![],
|
||||
current_slot: 0,
|
||||
next_shred_index: 0,
|
||||
next_code_index: 0,
|
||||
cluster_nodes_cache,
|
||||
}
|
||||
}
|
||||
@ -50,6 +52,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
|
||||
|
||||
if bank.slot() != self.current_slot {
|
||||
self.next_shred_index = 0;
|
||||
self.next_code_index = 0;
|
||||
self.current_slot = bank.slot();
|
||||
}
|
||||
|
||||
@ -83,22 +86,26 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
|
||||
)
|
||||
.expect("Expected to create a new shredder");
|
||||
|
||||
let (data_shreds, _) = shredder.entries_to_shreds(
|
||||
let (data_shreds, coding_shreds) = shredder.entries_to_shreds(
|
||||
keypair,
|
||||
&receive_results.entries,
|
||||
last_tick_height == bank.max_tick_height() && last_entries.is_none(),
|
||||
self.next_shred_index,
|
||||
self.next_code_index,
|
||||
);
|
||||
|
||||
self.next_shred_index += data_shreds.len() as u32;
|
||||
if let Some(index) = coding_shreds.iter().map(Shred::index).max() {
|
||||
self.next_code_index = index + 1;
|
||||
}
|
||||
let last_shreds = last_entries.map(|(good_last_entry, bad_last_entry)| {
|
||||
let (good_last_data_shred, _) =
|
||||
shredder.entries_to_shreds(keypair, &[good_last_entry], true, self.next_shred_index);
|
||||
shredder.entries_to_shreds(keypair, &[good_last_entry], true, self.next_shred_index, self.next_code_index);
|
||||
|
||||
let (bad_last_data_shred, _) =
|
||||
// Don't mark the last shred as last so that validators won't know that
|
||||
// they've gotten all the shreds, and will continue trying to repair
|
||||
shredder.entries_to_shreds(keypair, &[bad_last_entry], false, self.next_shred_index);
|
||||
shredder.entries_to_shreds(keypair, &[bad_last_entry], false, self.next_shred_index, self.next_code_index);
|
||||
|
||||
self.next_shred_index += 1;
|
||||
(good_last_data_shred, bad_last_data_shred)
|
||||
|
@ -141,8 +141,13 @@ impl StandardBroadcastRun {
|
||||
Some(index) => index + 1,
|
||||
None => next_shred_index,
|
||||
};
|
||||
let next_code_index = match &self.unfinished_slot {
|
||||
Some(state) => state.next_code_index,
|
||||
None => 0,
|
||||
};
|
||||
self.unfinished_slot = Some(UnfinishedSlotInfo {
|
||||
next_shred_index,
|
||||
next_code_index,
|
||||
slot,
|
||||
parent: parent_slot,
|
||||
data_shreds_buffer,
|
||||
@ -449,23 +454,40 @@ fn make_coding_shreds(
|
||||
is_slot_end: bool,
|
||||
stats: &mut ProcessShredsStats,
|
||||
) -> Vec<Shred> {
|
||||
let data_shreds = match unfinished_slot {
|
||||
None => Vec::default(),
|
||||
Some(unfinished_slot) => {
|
||||
let size = unfinished_slot.data_shreds_buffer.len();
|
||||
// Consume a multiple of 32, unless this is the slot end.
|
||||
let offset = if is_slot_end {
|
||||
0
|
||||
} else {
|
||||
size % MAX_DATA_SHREDS_PER_FEC_BLOCK as usize
|
||||
};
|
||||
unfinished_slot
|
||||
.data_shreds_buffer
|
||||
.drain(0..size - offset)
|
||||
.collect()
|
||||
}
|
||||
let unfinished_slot = match unfinished_slot {
|
||||
None => return Vec::default(),
|
||||
Some(state) => state,
|
||||
};
|
||||
Shredder::data_shreds_to_coding_shreds(keypair, &data_shreds, is_slot_end, stats).unwrap()
|
||||
let data_shreds: Vec<_> = {
|
||||
let size = unfinished_slot.data_shreds_buffer.len();
|
||||
// Consume a multiple of 32, unless this is the slot end.
|
||||
let offset = if is_slot_end {
|
||||
0
|
||||
} else {
|
||||
size % MAX_DATA_SHREDS_PER_FEC_BLOCK as usize
|
||||
};
|
||||
unfinished_slot
|
||||
.data_shreds_buffer
|
||||
.drain(0..size - offset)
|
||||
.collect()
|
||||
};
|
||||
let shreds = Shredder::data_shreds_to_coding_shreds(
|
||||
keypair,
|
||||
&data_shreds,
|
||||
is_slot_end,
|
||||
unfinished_slot.next_code_index,
|
||||
stats,
|
||||
)
|
||||
.unwrap();
|
||||
if let Some(index) = shreds
|
||||
.iter()
|
||||
.filter(|shred| shred.is_code())
|
||||
.map(Shred::index)
|
||||
.max()
|
||||
{
|
||||
unfinished_slot.next_code_index = unfinished_slot.next_code_index.max(index + 1);
|
||||
}
|
||||
shreds
|
||||
}
|
||||
|
||||
impl BroadcastRun for StandardBroadcastRun {
|
||||
@ -582,6 +604,7 @@ mod test {
|
||||
let parent = 0;
|
||||
run.unfinished_slot = Some(UnfinishedSlotInfo {
|
||||
next_shred_index,
|
||||
next_code_index: 17,
|
||||
slot,
|
||||
parent,
|
||||
data_shreds_buffer: Vec::default(),
|
||||
|
@ -270,6 +270,7 @@ mod tests {
|
||||
let coding = solana_ledger::shred::Shredder::generate_coding_shreds(
|
||||
&[shred],
|
||||
false, // is_last_in_slot
|
||||
3, // next_code_index
|
||||
);
|
||||
coding[0].copy_to_packet(&mut packet);
|
||||
ShredFetchStage::process_packet(
|
||||
|
@ -736,7 +736,12 @@ mod test {
|
||||
keypair: &Keypair,
|
||||
) -> Vec<Shred> {
|
||||
let shredder = Shredder::new(slot, parent, 0, 0).unwrap();
|
||||
shredder.entries_to_shreds(keypair, entries, true, 0).0
|
||||
let (data_shreds, _) = shredder.entries_to_shreds(
|
||||
keypair, entries, true, // is_last_in_slot
|
||||
0, // next_shred_index
|
||||
0, // next_code_index
|
||||
);
|
||||
data_shreds
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
Reference in New Issue
Block a user