diff --git a/core/benches/shredder.rs b/core/benches/shredder.rs index b1a680a39a..fff28699e0 100644 --- a/core/benches/shredder.rs +++ b/core/benches/shredder.rs @@ -43,15 +43,13 @@ fn make_shreds(num_shreds: usize) -> Vec { ); let entries = make_large_unchained_entries(txs_per_entry, num_entries); let shredder = Shredder::new(1, 0, Arc::new(Keypair::new()), 0, 0).unwrap(); - let data_shreds = shredder - .entries_to_data_shreds( - &entries, - true, // is_last_in_slot - 0, // next_shred_index - 0, // fec_set_offset - &mut ProcessShredsStats::default(), - ) - .0; + let data_shreds = shredder.entries_to_data_shreds( + &entries, + true, // is_last_in_slot + 0, // next_shred_index + 0, // fec_set_offset + &mut ProcessShredsStats::default(), + ); assert!(data_shreds.len() >= num_shreds); data_shreds } diff --git a/core/src/broadcast_stage/broadcast_duplicates_run.rs b/core/src/broadcast_stage/broadcast_duplicates_run.rs index 195fcc0dee..832d2bb0d6 100644 --- a/core/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/core/src/broadcast_stage/broadcast_duplicates_run.rs @@ -181,22 +181,23 @@ impl BroadcastRun for BroadcastDuplicatesRun { ) .expect("Expected to create a new shredder"); - let (data_shreds, coding_shreds, last_shred_index) = shredder.entries_to_shreds( + let (data_shreds, coding_shreds) = shredder.entries_to_shreds( &receive_results.entries, last_tick_height == bank.max_tick_height(), self.next_shred_index, ); + self.next_shred_index += data_shreds.len() as u32; let (duplicate_entries, next_duplicate_shred_index) = self.queue_or_create_duplicate_entries(&bank, &receive_results); - let (duplicate_data_shreds, duplicate_coding_shreds, _) = if !duplicate_entries.is_empty() { + let (duplicate_data_shreds, duplicate_coding_shreds) = if !duplicate_entries.is_empty() { shredder.entries_to_shreds( &duplicate_entries, last_tick_height == bank.max_tick_height(), next_duplicate_shred_index, ) } else { - (vec![], vec![], 0) + (vec![], vec![]) }; // Manually track the shred index because relying on slot meta consumed is racy @@ -204,8 +205,6 @@ impl BroadcastRun for BroadcastDuplicatesRun { self.next_shred_index = 0; self.duplicate_queue .register_hash(&self.last_duplicate_entry_hash, &FeeCalculator::default()); - } else { - self.next_shred_index = last_shred_index; } // Partition network with duplicate and real shreds based on stake diff --git a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs index 9b80131f17..a3f515030a 100644 --- a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs +++ b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs @@ -53,7 +53,7 @@ impl BroadcastRun for BroadcastFakeShredsRun { ) .expect("Expected to create a new shredder"); - let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds( + let (data_shreds, coding_shreds) = shredder.entries_to_shreds( &receive_results.entries, last_tick_height == bank.max_tick_height(), next_shred_index, @@ -69,7 +69,7 @@ impl BroadcastRun for BroadcastFakeShredsRun { .map(|_| Entry::new(&self.last_blockhash, 0, vec![])) .collect(); - let (fake_data_shreds, fake_coding_shreds, _) = shredder.entries_to_shreds( + let (fake_data_shreds, fake_coding_shreds) = shredder.entries_to_shreds( &fake_entries, last_tick_height == bank.max_tick_height(), next_shred_index, diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index 4f08d42f6f..8915f9a262 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -85,7 +85,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { ) .expect("Expected to create a new shredder"); - let (data_shreds, _, _) = shredder.entries_to_shreds( + let (data_shreds, _) = shredder.entries_to_shreds( &receive_results.entries, last_tick_height == bank.max_tick_height() && last_entries.is_none(), self.next_shred_index, @@ -93,10 +93,10 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { self.next_shred_index += data_shreds.len() as u32; let last_shreds = last_entries.map(|(good_last_entry, bad_last_entry)| { - let (good_last_data_shred, _, _) = + let (good_last_data_shred, _) = shredder.entries_to_shreds(&[good_last_entry], true, self.next_shred_index); - let (bad_last_data_shred, _, _) = + let (bad_last_data_shred, _) = // Don't mark the last shred as last so that validators won't know that // they've gotten all the shreds, and will continue trying to repair shredder.entries_to_shreds(&[bad_last_entry], false, self.next_shred_index); diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index 7686245913..ab5bfc696b 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -123,7 +123,7 @@ impl StandardBroadcastRun { None => (0, 0), }, }; - let (data_shreds, next_shred_index) = Shredder::new( + let data_shreds = Shredder::new( slot, parent_slot, self.keypair.clone(), @@ -146,6 +146,10 @@ impl StandardBroadcastRun { None => Vec::default(), }; data_shreds_buffer.extend(data_shreds.clone()); + let next_shred_index = match data_shreds.iter().map(Shred::index).max() { + Some(index) => index + 1, + None => next_shred_index, + }; self.unfinished_slot = Some(UnfinishedSlotInfo { next_shred_index, slot, diff --git a/gossip/src/duplicate_shred.rs b/gossip/src/duplicate_shred.rs index 15d51c116d..17684f0f9b 100644 --- a/gossip/src/duplicate_shred.rs +++ b/gossip/src/duplicate_shred.rs @@ -334,7 +334,7 @@ pub(crate) mod tests { }) .take(5) .collect(); - let (mut data_shreds, _coding_shreds, _last_shred_index) = shredder.entries_to_shreds( + let (mut data_shreds, _coding_shreds) = shredder.entries_to_shreds( &entries, true, // is_last_in_slot next_shred_index, diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 8f5cb9af5e..42c7dbae9d 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -1665,7 +1665,7 @@ impl Blockstore { 0 } }; - let (mut data_shreds, mut coding_shreds, _) = + let (mut data_shreds, mut coding_shreds) = shredder.entries_to_shreds(¤t_entries, true, start_index); all_shreds.append(&mut data_shreds); all_shreds.append(&mut coding_shreds); @@ -1686,7 +1686,7 @@ impl Blockstore { } if !slot_entries.is_empty() { - let (mut data_shreds, mut coding_shreds, _) = + let (mut data_shreds, mut coding_shreds) = shredder.entries_to_shreds(&slot_entries, is_full_slot, 0); all_shreds.append(&mut data_shreds); all_shreds.append(&mut coding_shreds); @@ -8104,7 +8104,7 @@ pub mod tests { let entries = make_slot_entries_with_transactions(num_entries); let leader_keypair = Arc::new(Keypair::new()); let shredder = Shredder::new(slot, parent_slot, leader_keypair.clone(), 0, 0).unwrap(); - let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds(&entries, true, 0); + let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&entries, true, 0); let genesis_config = create_genesis_config(2).genesis_config; let bank = Arc::new(Bank::new(&genesis_config)); @@ -8156,8 +8156,8 @@ pub mod tests { let entries2 = make_slot_entries_with_transactions(1); let leader_keypair = Arc::new(Keypair::new()); let shredder = Shredder::new(slot, 0, leader_keypair, 0, 0).unwrap(); - let (shreds, _, _) = shredder.entries_to_shreds(&entries1, true, 0); - let (duplicate_shreds, _, _) = shredder.entries_to_shreds(&entries2, true, 0); + let (shreds, _) = shredder.entries_to_shreds(&entries1, true, 0); + let (duplicate_shreds, _) = shredder.entries_to_shreds(&entries2, true, 0); let shred = shreds[0].clone(); let duplicate_shred = duplicate_shreds[0].clone(); let non_duplicate_shred = shred.clone(); diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 4aedb6e387..2ef8bbc6d3 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -775,9 +775,12 @@ impl Shredder { entries: &[Entry], is_last_in_slot: bool, next_shred_index: u32, - ) -> (Vec, Vec, u32) { + ) -> ( + Vec, // data shreds + Vec, // coding shreds + ) { let mut stats = ProcessShredsStats::default(); - let (data_shreds, last_shred_index) = self.entries_to_data_shreds( + let data_shreds = self.entries_to_data_shreds( entries, is_last_in_slot, next_shred_index, @@ -791,7 +794,7 @@ impl Shredder { &mut stats, ) .unwrap(); - (data_shreds, coding_shreds, last_shred_index) + (data_shreds, coding_shreds) } // Each FEC block has maximum MAX_DATA_SHREDS_PER_FEC_BLOCK shreds. @@ -812,7 +815,7 @@ impl Shredder { // Shred index offset at which FEC sets are generated. fec_set_offset: u32, process_stats: &mut ProcessShredsStats, - ) -> (Vec, u32) { + ) -> Vec { let mut serialize_time = Measure::start("shred_serialize"); let serialized_shreds = bincode::serialize(entries).expect("Expect to serialize all entries"); @@ -860,7 +863,7 @@ impl Shredder { process_stats.serialize_elapsed += serialize_time.as_us(); process_stats.gen_data_elapsed += gen_data_time.as_us(); - (data_shreds, last_shred_index + 1) + data_shreds } pub fn data_shreds_to_coding_shreds( @@ -1323,8 +1326,8 @@ pub mod tests { .saturating_sub(num_expected_data_shreds as usize) .max(num_expected_data_shreds as usize); let start_index = 0; - let (data_shreds, coding_shreds, next_index) = - shredder.entries_to_shreds(&entries, true, start_index); + let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&entries, true, start_index); + let next_index = data_shreds.last().unwrap().index() + 1; assert_eq!(next_index as u64, num_expected_data_shreds); let mut data_shred_indexes = HashSet::new(); @@ -1476,7 +1479,7 @@ pub mod tests { }) .collect(); - let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds(&entries, true, 0); + let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&entries, true, 0); for (i, s) in data_shreds.iter().enumerate() { verify_test_data_shred( @@ -1524,7 +1527,7 @@ pub mod tests { .collect(); let serialized_entries = bincode::serialize(&entries).unwrap(); - let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds( + let (data_shreds, coding_shreds) = shredder.entries_to_shreds( &entries, is_last_in_slot, 0, // next_shred_index @@ -1654,7 +1657,7 @@ pub mod tests { // Test5: Try recovery/reassembly with non zero index full slot with 3 missing data shreds // and 2 missing coding shreds. Hint: should work let serialized_entries = bincode::serialize(&entries).unwrap(); - let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds(&entries, true, 25); + let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&entries, true, 25); // We should have 10 shreds now assert_eq!(data_shreds.len(), num_data_shreds); @@ -1732,7 +1735,7 @@ pub mod tests { ) .unwrap(); let next_shred_index = rng.gen_range(1, 1024); - let (data_shreds, coding_shreds, _) = + let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&[entry], is_last_in_slot, next_shred_index); let num_data_shreds = data_shreds.len(); let mut shreds = coding_shreds; @@ -1786,8 +1789,7 @@ pub mod tests { }) .collect(); - let (data_shreds, coding_shreds, _next_index) = - shredder.entries_to_shreds(&entries, true, 0); + let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&entries, true, 0); assert!(!data_shreds .iter() .chain(coding_shreds.iter()) @@ -1835,8 +1837,7 @@ pub mod tests { .collect(); let start_index = 0x12; - let (data_shreds, coding_shreds, _next_index) = - shredder.entries_to_shreds(&entries, true, start_index); + let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&entries, true, start_index); let max_per_block = MAX_DATA_SHREDS_PER_FEC_BLOCK as usize; data_shreds.iter().enumerate().for_each(|(i, s)| { @@ -1872,7 +1873,7 @@ pub mod tests { let mut stats = ProcessShredsStats::default(); let start_index = 0x12; - let (data_shreds, _next_index) = shredder.entries_to_data_shreds( + let data_shreds = shredder.entries_to_data_shreds( &entries, true, // is_last_in_slot start_index, diff --git a/ledger/tests/shred.rs b/ledger/tests/shred.rs index abb1ece13c..68dcf68163 100644 --- a/ledger/tests/shred.rs +++ b/ledger/tests/shred.rs @@ -50,7 +50,8 @@ fn test_multi_fec_block_coding() { .collect(); let serialized_entries = bincode::serialize(&entries).unwrap(); - let (data_shreds, coding_shreds, next_index) = shredder.entries_to_shreds(&entries, true, 0); + let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&entries, true, 0); + let next_index = data_shreds.last().unwrap().index() + 1; assert_eq!(next_index as usize, num_data_shreds); assert_eq!(data_shreds.len(), num_data_shreds); assert_eq!(coding_shreds.len(), num_data_shreds); @@ -219,7 +220,7 @@ fn setup_different_sized_fec_blocks( let total_num_data_shreds: usize = 2 * num_shreds_per_iter; for i in 0..2 { let is_last = i == 1; - let (data_shreds, coding_shreds, new_next_index) = + let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&entries, is_last, next_index); for shred in &data_shreds { if (shred.index() as usize) == total_num_data_shreds - 1 { @@ -233,7 +234,7 @@ fn setup_different_sized_fec_blocks( } } assert_eq!(data_shreds.len(), num_shreds_per_iter as usize); - next_index = new_next_index; + next_index = data_shreds.last().unwrap().index() + 1; sort_data_coding_into_fec_sets( data_shreds, coding_shreds,