* removes next_shred_index from return value of entries to shreds api (#21961)
next-shred-index is already readily available from returned data shreds.
The commit simplifies the api for upcoming changes to erasure coding
schema which will require explicit tracking of indices for coding shreds
as well as data shreds.
(cherry picked from commit 89d66c3210
)
# Conflicts:
# core/benches/shredder.rs
# core/src/broadcast_stage/broadcast_duplicates_run.rs
# core/src/broadcast_stage/broadcast_fake_shreds_run.rs
# core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs
# core/src/broadcast_stage/standard_broadcast_run.rs
# gossip/src/duplicate_shred.rs
# ledger/src/blockstore.rs
# ledger/src/shred.rs
# ledger/tests/shred.rs
* removes mergify merge conflicts
Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
@ -43,15 +43,13 @@ fn make_shreds(num_shreds: usize) -> Vec<Shred> {
|
||||
);
|
||||
let entries = make_large_unchained_entries(txs_per_entry, num_entries);
|
||||
let shredder = Shredder::new(1, 0, Arc::new(Keypair::new()), 0, 0).unwrap();
|
||||
let data_shreds = shredder
|
||||
.entries_to_data_shreds(
|
||||
&entries,
|
||||
true, // is_last_in_slot
|
||||
0, // next_shred_index
|
||||
0, // fec_set_offset
|
||||
&mut ProcessShredsStats::default(),
|
||||
)
|
||||
.0;
|
||||
let data_shreds = shredder.entries_to_data_shreds(
|
||||
&entries,
|
||||
true, // is_last_in_slot
|
||||
0, // next_shred_index
|
||||
0, // fec_set_offset
|
||||
&mut ProcessShredsStats::default(),
|
||||
);
|
||||
assert!(data_shreds.len() >= num_shreds);
|
||||
data_shreds
|
||||
}
|
||||
|
@ -181,22 +181,23 @@ impl BroadcastRun for BroadcastDuplicatesRun {
|
||||
)
|
||||
.expect("Expected to create a new shredder");
|
||||
|
||||
let (data_shreds, coding_shreds, last_shred_index) = shredder.entries_to_shreds(
|
||||
let (data_shreds, coding_shreds) = shredder.entries_to_shreds(
|
||||
&receive_results.entries,
|
||||
last_tick_height == bank.max_tick_height(),
|
||||
self.next_shred_index,
|
||||
);
|
||||
|
||||
self.next_shred_index += data_shreds.len() as u32;
|
||||
let (duplicate_entries, next_duplicate_shred_index) =
|
||||
self.queue_or_create_duplicate_entries(&bank, &receive_results);
|
||||
let (duplicate_data_shreds, duplicate_coding_shreds, _) = if !duplicate_entries.is_empty() {
|
||||
let (duplicate_data_shreds, duplicate_coding_shreds) = if !duplicate_entries.is_empty() {
|
||||
shredder.entries_to_shreds(
|
||||
&duplicate_entries,
|
||||
last_tick_height == bank.max_tick_height(),
|
||||
next_duplicate_shred_index,
|
||||
)
|
||||
} else {
|
||||
(vec![], vec![], 0)
|
||||
(vec![], vec![])
|
||||
};
|
||||
|
||||
// Manually track the shred index because relying on slot meta consumed is racy
|
||||
@ -204,8 +205,6 @@ impl BroadcastRun for BroadcastDuplicatesRun {
|
||||
self.next_shred_index = 0;
|
||||
self.duplicate_queue
|
||||
.register_hash(&self.last_duplicate_entry_hash, &FeeCalculator::default());
|
||||
} else {
|
||||
self.next_shred_index = last_shred_index;
|
||||
}
|
||||
|
||||
// Partition network with duplicate and real shreds based on stake
|
||||
|
@ -53,7 +53,7 @@ impl BroadcastRun for BroadcastFakeShredsRun {
|
||||
)
|
||||
.expect("Expected to create a new shredder");
|
||||
|
||||
let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds(
|
||||
let (data_shreds, coding_shreds) = shredder.entries_to_shreds(
|
||||
&receive_results.entries,
|
||||
last_tick_height == bank.max_tick_height(),
|
||||
next_shred_index,
|
||||
@ -69,7 +69,7 @@ impl BroadcastRun for BroadcastFakeShredsRun {
|
||||
.map(|_| Entry::new(&self.last_blockhash, 0, vec![]))
|
||||
.collect();
|
||||
|
||||
let (fake_data_shreds, fake_coding_shreds, _) = shredder.entries_to_shreds(
|
||||
let (fake_data_shreds, fake_coding_shreds) = shredder.entries_to_shreds(
|
||||
&fake_entries,
|
||||
last_tick_height == bank.max_tick_height(),
|
||||
next_shred_index,
|
||||
|
@ -85,7 +85,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
|
||||
)
|
||||
.expect("Expected to create a new shredder");
|
||||
|
||||
let (data_shreds, _, _) = shredder.entries_to_shreds(
|
||||
let (data_shreds, _) = shredder.entries_to_shreds(
|
||||
&receive_results.entries,
|
||||
last_tick_height == bank.max_tick_height() && last_entries.is_none(),
|
||||
self.next_shred_index,
|
||||
@ -93,10 +93,10 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
|
||||
|
||||
self.next_shred_index += data_shreds.len() as u32;
|
||||
let last_shreds = last_entries.map(|(good_last_entry, bad_last_entry)| {
|
||||
let (good_last_data_shred, _, _) =
|
||||
let (good_last_data_shred, _) =
|
||||
shredder.entries_to_shreds(&[good_last_entry], true, self.next_shred_index);
|
||||
|
||||
let (bad_last_data_shred, _, _) =
|
||||
let (bad_last_data_shred, _) =
|
||||
// Don't mark the last shred as last so that validators won't know that
|
||||
// they've gotten all the shreds, and will continue trying to repair
|
||||
shredder.entries_to_shreds(&[bad_last_entry], false, self.next_shred_index);
|
||||
|
@ -123,7 +123,7 @@ impl StandardBroadcastRun {
|
||||
None => (0, 0),
|
||||
},
|
||||
};
|
||||
let (data_shreds, next_shred_index) = Shredder::new(
|
||||
let data_shreds = Shredder::new(
|
||||
slot,
|
||||
parent_slot,
|
||||
self.keypair.clone(),
|
||||
@ -146,6 +146,10 @@ impl StandardBroadcastRun {
|
||||
None => Vec::default(),
|
||||
};
|
||||
data_shreds_buffer.extend(data_shreds.clone());
|
||||
let next_shred_index = match data_shreds.iter().map(Shred::index).max() {
|
||||
Some(index) => index + 1,
|
||||
None => next_shred_index,
|
||||
};
|
||||
self.unfinished_slot = Some(UnfinishedSlotInfo {
|
||||
next_shred_index,
|
||||
slot,
|
||||
|
@ -334,7 +334,7 @@ pub(crate) mod tests {
|
||||
})
|
||||
.take(5)
|
||||
.collect();
|
||||
let (mut data_shreds, _coding_shreds, _last_shred_index) = shredder.entries_to_shreds(
|
||||
let (mut data_shreds, _coding_shreds) = shredder.entries_to_shreds(
|
||||
&entries,
|
||||
true, // is_last_in_slot
|
||||
next_shred_index,
|
||||
|
@ -1665,7 +1665,7 @@ impl Blockstore {
|
||||
0
|
||||
}
|
||||
};
|
||||
let (mut data_shreds, mut coding_shreds, _) =
|
||||
let (mut data_shreds, mut coding_shreds) =
|
||||
shredder.entries_to_shreds(¤t_entries, true, start_index);
|
||||
all_shreds.append(&mut data_shreds);
|
||||
all_shreds.append(&mut coding_shreds);
|
||||
@ -1686,7 +1686,7 @@ impl Blockstore {
|
||||
}
|
||||
|
||||
if !slot_entries.is_empty() {
|
||||
let (mut data_shreds, mut coding_shreds, _) =
|
||||
let (mut data_shreds, mut coding_shreds) =
|
||||
shredder.entries_to_shreds(&slot_entries, is_full_slot, 0);
|
||||
all_shreds.append(&mut data_shreds);
|
||||
all_shreds.append(&mut coding_shreds);
|
||||
@ -8104,7 +8104,7 @@ pub mod tests {
|
||||
let entries = make_slot_entries_with_transactions(num_entries);
|
||||
let leader_keypair = Arc::new(Keypair::new());
|
||||
let shredder = Shredder::new(slot, parent_slot, leader_keypair.clone(), 0, 0).unwrap();
|
||||
let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds(&entries, true, 0);
|
||||
let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&entries, true, 0);
|
||||
|
||||
let genesis_config = create_genesis_config(2).genesis_config;
|
||||
let bank = Arc::new(Bank::new(&genesis_config));
|
||||
@ -8156,8 +8156,8 @@ pub mod tests {
|
||||
let entries2 = make_slot_entries_with_transactions(1);
|
||||
let leader_keypair = Arc::new(Keypair::new());
|
||||
let shredder = Shredder::new(slot, 0, leader_keypair, 0, 0).unwrap();
|
||||
let (shreds, _, _) = shredder.entries_to_shreds(&entries1, true, 0);
|
||||
let (duplicate_shreds, _, _) = shredder.entries_to_shreds(&entries2, true, 0);
|
||||
let (shreds, _) = shredder.entries_to_shreds(&entries1, true, 0);
|
||||
let (duplicate_shreds, _) = shredder.entries_to_shreds(&entries2, true, 0);
|
||||
let shred = shreds[0].clone();
|
||||
let duplicate_shred = duplicate_shreds[0].clone();
|
||||
let non_duplicate_shred = shred.clone();
|
||||
|
@ -775,9 +775,12 @@ impl Shredder {
|
||||
entries: &[Entry],
|
||||
is_last_in_slot: bool,
|
||||
next_shred_index: u32,
|
||||
) -> (Vec<Shred>, Vec<Shred>, u32) {
|
||||
) -> (
|
||||
Vec<Shred>, // data shreds
|
||||
Vec<Shred>, // coding shreds
|
||||
) {
|
||||
let mut stats = ProcessShredsStats::default();
|
||||
let (data_shreds, last_shred_index) = self.entries_to_data_shreds(
|
||||
let data_shreds = self.entries_to_data_shreds(
|
||||
entries,
|
||||
is_last_in_slot,
|
||||
next_shred_index,
|
||||
@ -791,7 +794,7 @@ impl Shredder {
|
||||
&mut stats,
|
||||
)
|
||||
.unwrap();
|
||||
(data_shreds, coding_shreds, last_shred_index)
|
||||
(data_shreds, coding_shreds)
|
||||
}
|
||||
|
||||
// Each FEC block has maximum MAX_DATA_SHREDS_PER_FEC_BLOCK shreds.
|
||||
@ -812,7 +815,7 @@ impl Shredder {
|
||||
// Shred index offset at which FEC sets are generated.
|
||||
fec_set_offset: u32,
|
||||
process_stats: &mut ProcessShredsStats,
|
||||
) -> (Vec<Shred>, u32) {
|
||||
) -> Vec<Shred> {
|
||||
let mut serialize_time = Measure::start("shred_serialize");
|
||||
let serialized_shreds =
|
||||
bincode::serialize(entries).expect("Expect to serialize all entries");
|
||||
@ -860,7 +863,7 @@ impl Shredder {
|
||||
process_stats.serialize_elapsed += serialize_time.as_us();
|
||||
process_stats.gen_data_elapsed += gen_data_time.as_us();
|
||||
|
||||
(data_shreds, last_shred_index + 1)
|
||||
data_shreds
|
||||
}
|
||||
|
||||
pub fn data_shreds_to_coding_shreds(
|
||||
@ -1323,8 +1326,8 @@ pub mod tests {
|
||||
.saturating_sub(num_expected_data_shreds as usize)
|
||||
.max(num_expected_data_shreds as usize);
|
||||
let start_index = 0;
|
||||
let (data_shreds, coding_shreds, next_index) =
|
||||
shredder.entries_to_shreds(&entries, true, start_index);
|
||||
let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&entries, true, start_index);
|
||||
let next_index = data_shreds.last().unwrap().index() + 1;
|
||||
assert_eq!(next_index as u64, num_expected_data_shreds);
|
||||
|
||||
let mut data_shred_indexes = HashSet::new();
|
||||
@ -1476,7 +1479,7 @@ pub mod tests {
|
||||
})
|
||||
.collect();
|
||||
|
||||
let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds(&entries, true, 0);
|
||||
let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&entries, true, 0);
|
||||
|
||||
for (i, s) in data_shreds.iter().enumerate() {
|
||||
verify_test_data_shred(
|
||||
@ -1524,7 +1527,7 @@ pub mod tests {
|
||||
.collect();
|
||||
|
||||
let serialized_entries = bincode::serialize(&entries).unwrap();
|
||||
let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds(
|
||||
let (data_shreds, coding_shreds) = shredder.entries_to_shreds(
|
||||
&entries,
|
||||
is_last_in_slot,
|
||||
0, // next_shred_index
|
||||
@ -1654,7 +1657,7 @@ pub mod tests {
|
||||
// Test5: Try recovery/reassembly with non zero index full slot with 3 missing data shreds
|
||||
// and 2 missing coding shreds. Hint: should work
|
||||
let serialized_entries = bincode::serialize(&entries).unwrap();
|
||||
let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds(&entries, true, 25);
|
||||
let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&entries, true, 25);
|
||||
// We should have 10 shreds now
|
||||
assert_eq!(data_shreds.len(), num_data_shreds);
|
||||
|
||||
@ -1732,7 +1735,7 @@ pub mod tests {
|
||||
)
|
||||
.unwrap();
|
||||
let next_shred_index = rng.gen_range(1, 1024);
|
||||
let (data_shreds, coding_shreds, _) =
|
||||
let (data_shreds, coding_shreds) =
|
||||
shredder.entries_to_shreds(&[entry], is_last_in_slot, next_shred_index);
|
||||
let num_data_shreds = data_shreds.len();
|
||||
let mut shreds = coding_shreds;
|
||||
@ -1786,8 +1789,7 @@ pub mod tests {
|
||||
})
|
||||
.collect();
|
||||
|
||||
let (data_shreds, coding_shreds, _next_index) =
|
||||
shredder.entries_to_shreds(&entries, true, 0);
|
||||
let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&entries, true, 0);
|
||||
assert!(!data_shreds
|
||||
.iter()
|
||||
.chain(coding_shreds.iter())
|
||||
@ -1835,8 +1837,7 @@ pub mod tests {
|
||||
.collect();
|
||||
|
||||
let start_index = 0x12;
|
||||
let (data_shreds, coding_shreds, _next_index) =
|
||||
shredder.entries_to_shreds(&entries, true, start_index);
|
||||
let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&entries, true, start_index);
|
||||
|
||||
let max_per_block = MAX_DATA_SHREDS_PER_FEC_BLOCK as usize;
|
||||
data_shreds.iter().enumerate().for_each(|(i, s)| {
|
||||
@ -1872,7 +1873,7 @@ pub mod tests {
|
||||
|
||||
let mut stats = ProcessShredsStats::default();
|
||||
let start_index = 0x12;
|
||||
let (data_shreds, _next_index) = shredder.entries_to_data_shreds(
|
||||
let data_shreds = shredder.entries_to_data_shreds(
|
||||
&entries,
|
||||
true, // is_last_in_slot
|
||||
start_index,
|
||||
|
@ -50,7 +50,8 @@ fn test_multi_fec_block_coding() {
|
||||
.collect();
|
||||
|
||||
let serialized_entries = bincode::serialize(&entries).unwrap();
|
||||
let (data_shreds, coding_shreds, next_index) = shredder.entries_to_shreds(&entries, true, 0);
|
||||
let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&entries, true, 0);
|
||||
let next_index = data_shreds.last().unwrap().index() + 1;
|
||||
assert_eq!(next_index as usize, num_data_shreds);
|
||||
assert_eq!(data_shreds.len(), num_data_shreds);
|
||||
assert_eq!(coding_shreds.len(), num_data_shreds);
|
||||
@ -219,7 +220,7 @@ fn setup_different_sized_fec_blocks(
|
||||
let total_num_data_shreds: usize = 2 * num_shreds_per_iter;
|
||||
for i in 0..2 {
|
||||
let is_last = i == 1;
|
||||
let (data_shreds, coding_shreds, new_next_index) =
|
||||
let (data_shreds, coding_shreds) =
|
||||
shredder.entries_to_shreds(&entries, is_last, next_index);
|
||||
for shred in &data_shreds {
|
||||
if (shred.index() as usize) == total_num_data_shreds - 1 {
|
||||
@ -233,7 +234,7 @@ fn setup_different_sized_fec_blocks(
|
||||
}
|
||||
}
|
||||
assert_eq!(data_shreds.len(), num_shreds_per_iter as usize);
|
||||
next_index = new_next_index;
|
||||
next_index = data_shreds.last().unwrap().index() + 1;
|
||||
sort_data_coding_into_fec_sets(
|
||||
data_shreds,
|
||||
coding_shreds,
|
||||
|
Reference in New Issue
Block a user