From 37b8587d4e013117cbac22195d0230e0af61829e Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Wed, 21 Apr 2021 12:47:50 +0000 Subject: [PATCH] expands number of erasure coding shreds in the last batch in slots (#16484) Number of parity coding shreds is always less than the number of data shreds in FEC blocks: https://github.com/solana-labs/solana/blob/6907a2366/ledger/src/shred.rs#L719 Data shreds are batched in chunks of 32 shreds each: https://github.com/solana-labs/solana/blob/6907a2366/ledger/src/shred.rs#L714 However the very last batch of data shreds in a slot can be small, in which case the loss rate can be exacerbated. This commit expands the number of coding shreds in the last FEC block in slots to: 64 - number of data shreds; so that FEC blocks are always 64 data and parity coding shreds each. As a consequence of this, the last FEC block has more parity coding shreds than data shreds. So for some shred indices we will have a coding shred but no data shreds. This should not cause any kind of overlapping FEC blocks as in: https://github.com/solana-labs/solana/pull/10095 since this is done only for the very last batch in a slot, and the next slot will reset the shred index. --- core/benches/retransmit_stage.rs | 3 +- core/benches/shredder.rs | 19 +- core/src/broadcast_stage.rs | 4 +- .../broadcast_fake_shreds_run.rs | 3 +- .../fail_entry_verification_broadcast_run.rs | 1 - .../broadcast_stage/standard_broadcast_run.rs | 6 +- core/src/cluster_info.rs | 14 +- core/src/duplicate_shred.rs | 14 +- core/src/shred_fetch_stage.rs | 6 +- core/src/window_service.rs | 3 +- ledger/src/blockstore.rs | 50 +-- ledger/src/erasure.rs | 10 +- ledger/src/shred.rs | 365 +++++++++--------- ledger/tests/shred.rs | 7 +- 14 files changed, 243 insertions(+), 262 deletions(-) diff --git a/core/benches/retransmit_stage.rs b/core/benches/retransmit_stage.rs index 1014d2cd1a..06cc569dc1 100644 --- a/core/benches/retransmit_stage.rs +++ b/core/benches/retransmit_stage.rs @@ -81,8 +81,7 @@ fn bench_retransmitter(bencher: &mut Bencher) { let keypair = Arc::new(Keypair::new()); let slot = 0; let parent = 0; - let shredder = - Shredder::new(slot, parent, 0.0, keypair, 0, 0).expect("Failed to create entry shredder"); + let shredder = Shredder::new(slot, parent, keypair, 0, 0).unwrap(); let mut data_shreds = shredder.entries_to_shreds(&entries, true, 0).0; let num_packets = data_shreds.len(); diff --git a/core/benches/shredder.rs b/core/benches/shredder.rs index 00378a1eb1..1ea0cf50ae 100644 --- a/core/benches/shredder.rs +++ b/core/benches/shredder.rs @@ -8,8 +8,8 @@ use raptorq::{Decoder, Encoder}; use solana_ledger::entry::{create_ticks, Entry}; use solana_ledger::shred::{ max_entries_per_n_shred, max_ticks_per_n_shreds, ProcessShredsStats, Shred, Shredder, - MAX_DATA_SHREDS_PER_FEC_BLOCK, RECOMMENDED_FEC_RATE, SHRED_PAYLOAD_SIZE, - SIZE_OF_CODING_SHRED_HEADERS, SIZE_OF_DATA_SHRED_PAYLOAD, + MAX_DATA_SHREDS_PER_FEC_BLOCK, SHRED_PAYLOAD_SIZE, SIZE_OF_CODING_SHRED_HEADERS, + SIZE_OF_DATA_SHRED_PAYLOAD, }; use solana_perf::test_tx; use solana_sdk::hash::Hash; @@ -39,8 +39,7 @@ fn make_shreds(num_shreds: usize) -> Vec { Some(shred_size), ); let entries = make_large_unchained_entries(txs_per_entry, num_entries); - let shredder = - Shredder::new(1, 0, RECOMMENDED_FEC_RATE, Arc::new(Keypair::new()), 0, 0).unwrap(); + let shredder = Shredder::new(1, 0, Arc::new(Keypair::new()), 0, 0).unwrap(); let data_shreds = shredder .entries_to_data_shreds( &entries, @@ -75,7 +74,7 @@ fn bench_shredder_ticks(bencher: &mut Bencher) { let num_ticks = max_ticks_per_n_shreds(1, Some(SIZE_OF_DATA_SHRED_PAYLOAD)) * num_shreds as u64; let entries = create_ticks(num_ticks, 0, Hash::default()); bencher.iter(|| { - let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp.clone(), 0, 0).unwrap(); + let shredder = Shredder::new(1, 0, kp.clone(), 0, 0).unwrap(); shredder.entries_to_shreds(&entries, true, 0); }) } @@ -94,7 +93,7 @@ fn bench_shredder_large_entries(bencher: &mut Bencher) { let entries = make_large_unchained_entries(txs_per_entry, num_entries); // 1Mb bencher.iter(|| { - let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp.clone(), 0, 0).unwrap(); + let shredder = Shredder::new(1, 0, kp.clone(), 0, 0).unwrap(); shredder.entries_to_shreds(&entries, true, 0); }) } @@ -107,7 +106,7 @@ fn bench_deshredder(bencher: &mut Bencher) { let num_shreds = ((10000 * 1000) + (shred_size - 1)) / shred_size; let num_ticks = max_ticks_per_n_shreds(1, Some(shred_size)) * num_shreds as u64; let entries = create_ticks(num_ticks, 0, Hash::default()); - let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp, 0, 0).unwrap(); + let shredder = Shredder::new(1, 0, kp, 0, 0).unwrap(); let data_shreds = shredder.entries_to_shreds(&entries, true, 0).0; bencher.iter(|| { let raw = &mut Shredder::deshred(&data_shreds).unwrap(); @@ -133,9 +132,8 @@ fn bench_shredder_coding(bencher: &mut Bencher) { let data_shreds = make_shreds(symbol_count); bencher.iter(|| { Shredder::generate_coding_shreds( - RECOMMENDED_FEC_RATE, &data_shreds[..symbol_count], - symbol_count, + true, // is_last_in_slot ) .len(); }) @@ -146,9 +144,8 @@ fn bench_shredder_decoding(bencher: &mut Bencher) { let symbol_count = MAX_DATA_SHREDS_PER_FEC_BLOCK as usize; let data_shreds = make_shreds(symbol_count); let coding_shreds = Shredder::generate_coding_shreds( - RECOMMENDED_FEC_RATE, &data_shreds[..symbol_count], - symbol_count, + true, // is_last_in_slot ); bencher.iter(|| { Shredder::try_recovery( diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 275ae7f47e..f08ac8b826 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -447,7 +447,7 @@ pub mod test { entry::create_ticks, genesis_utils::{create_genesis_config, GenesisConfigInfo}, get_tmp_ledger_path, - shred::{max_ticks_per_n_shreds, ProcessShredsStats, Shredder, RECOMMENDED_FEC_RATE}, + shred::{max_ticks_per_n_shreds, ProcessShredsStats, Shredder}, }; use solana_runtime::bank::Bank; use solana_sdk::{ @@ -476,7 +476,7 @@ pub mod test { let coding_shreds = Shredder::data_shreds_to_coding_shreds( &keypair, &data_shreds[0..], - RECOMMENDED_FEC_RATE, + true, // is_last_in_slot &mut ProcessShredsStats::default(), ) .unwrap(); diff --git a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs index ce512c6a04..9173be7ded 100644 --- a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs +++ b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs @@ -1,6 +1,6 @@ use super::*; use solana_ledger::entry::Entry; -use solana_ledger::shred::{Shredder, RECOMMENDED_FEC_RATE}; +use solana_ledger::shred::Shredder; use solana_sdk::hash::Hash; use solana_sdk::signature::Keypair; @@ -47,7 +47,6 @@ impl BroadcastRun for BroadcastFakeShredsRun { let shredder = Shredder::new( bank.slot(), bank.parent().unwrap().slot(), - RECOMMENDED_FEC_RATE, self.keypair.clone(), (bank.tick_height() % bank.ticks_per_slot()) as u8, self.shred_version, diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index 17d34e99a1..b66681786d 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -71,7 +71,6 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { let shredder = Shredder::new( bank.slot(), bank.parent().unwrap().slot(), - 0.0, self.keypair.clone(), (bank.tick_height() % bank.ticks_per_slot()) as u8, self.shred_version, diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index 6553084572..6c0bb12482 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -8,7 +8,7 @@ use crate::broadcast_stage::broadcast_utils::UnfinishedSlotInfo; use solana_ledger::{ entry::Entry, shred::{ - ProcessShredsStats, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK, RECOMMENDED_FEC_RATE, + ProcessShredsStats, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK, SHRED_TICK_REFERENCE_MASK, }, }; @@ -121,7 +121,6 @@ impl StandardBroadcastRun { let (data_shreds, next_shred_index) = Shredder::new( slot, parent_slot, - RECOMMENDED_FEC_RATE, self.keypair.clone(), reference_tick, self.shred_version, @@ -451,8 +450,7 @@ fn make_coding_shreds( .collect() } }; - Shredder::data_shreds_to_coding_shreds(keypair, &data_shreds, RECOMMENDED_FEC_RATE, stats) - .unwrap() + Shredder::data_shreds_to_coding_shreds(keypair, &data_shreds, is_slot_end, stats).unwrap() } impl BroadcastRun for StandardBroadcastRun { diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index a7ada5291b..3c05c31354 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -3809,17 +3809,9 @@ mod tests { let mut rng = rand::thread_rng(); let leader = Arc::new(Keypair::new()); let keypair = Keypair::new(); - let (slot, parent_slot, fec_rate, reference_tick, version) = - (53084024, 53084023, 0.0, 0, 0); - let shredder = Shredder::new( - slot, - parent_slot, - fec_rate, - leader.clone(), - reference_tick, - version, - ) - .unwrap(); + let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0); + let shredder = + Shredder::new(slot, parent_slot, leader.clone(), reference_tick, version).unwrap(); let next_shred_index = rng.gen(); let shred = new_rand_shred(&mut rng, next_shred_index, &shredder); let other_payload = new_rand_shred(&mut rng, next_shred_index, &shredder).payload; diff --git a/core/src/duplicate_shred.rs b/core/src/duplicate_shred.rs index 47cec81db0..db67f08f6f 100644 --- a/core/src/duplicate_shred.rs +++ b/core/src/duplicate_shred.rs @@ -343,17 +343,9 @@ pub(crate) mod tests { fn test_duplicate_shred_round_trip() { let mut rng = rand::thread_rng(); let leader = Arc::new(Keypair::new()); - let (slot, parent_slot, fec_rate, reference_tick, version) = - (53084024, 53084023, 0.0, 0, 0); - let shredder = Shredder::new( - slot, - parent_slot, - fec_rate, - leader.clone(), - reference_tick, - version, - ) - .unwrap(); + let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0); + let shredder = + Shredder::new(slot, parent_slot, leader.clone(), reference_tick, version).unwrap(); let next_shred_index = rng.gen(); let shred1 = new_rand_shred(&mut rng, next_shred_index, &shredder); let shred2 = new_rand_shred(&mut rng, next_shred_index, &shredder); diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index 4f79f8ba0e..b7946b54d1 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -262,8 +262,10 @@ mod tests { &hasher, ); assert!(!packet.meta.discard); - - let coding = solana_ledger::shred::Shredder::generate_coding_shreds(1.0f32, &[shred], 1); + let coding = solana_ledger::shred::Shredder::generate_coding_shreds( + &[shred], + false, // is_last_in_slot + ); coding[0].copy_to_packet(&mut packet); ShredFetchStage::process_packet( &mut packet, diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 46d875fcd1..b303d282b2 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -636,8 +636,7 @@ mod test { parent: Slot, keypair: &Arc, ) -> Vec { - let shredder = Shredder::new(slot, parent, 0.0, keypair.clone(), 0, 0) - .expect("Failed to create entry shredder"); + let shredder = Shredder::new(slot, parent, keypair.clone(), 0, 0).unwrap(); shredder.entries_to_shreds(&entries, true, 0).0 } diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index aabdedde63..e583b4ab6f 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -1518,7 +1518,7 @@ impl Blockstore { // Only used by tests #[allow(clippy::too_many_arguments)] - pub fn write_entries( + pub(crate) fn write_entries( &self, start_slot: Slot, num_ticks_in_start_slot: u64, @@ -1529,7 +1529,7 @@ impl Blockstore { keypair: &Arc, entries: Vec, version: u16, - ) -> Result { + ) -> Result { let mut parent_slot = parent.map_or(start_slot.saturating_sub(1), |v| v); let num_slots = (start_slot - parent_slot).max(1); // Note: slot 0 has parent slot 0 assert!(num_ticks_in_start_slot < num_slots * ticks_per_slot); @@ -1537,8 +1537,7 @@ impl Blockstore { let mut current_slot = start_slot; let mut shredder = - Shredder::new(current_slot, parent_slot, 0.0, keypair.clone(), 0, version) - .expect("Failed to create entry shredder"); + Shredder::new(current_slot, parent_slot, keypair.clone(), 0, version).unwrap(); let mut all_shreds = vec![]; let mut slot_entries = vec![]; // Find all the entries for start_slot @@ -1563,12 +1562,11 @@ impl Blockstore { shredder = Shredder::new( current_slot, parent_slot, - 0.0, keypair.clone(), (ticks_per_slot - remaining_ticks_in_slot) as u8, version, ) - .expect("Failed to create entry shredder"); + .unwrap(); } if entry.is_tick() { @@ -1583,10 +1581,9 @@ impl Blockstore { all_shreds.append(&mut data_shreds); all_shreds.append(&mut coding_shreds); } - - let num_shreds = all_shreds.len(); + let num_data = all_shreds.iter().filter(|shred| shred.is_data()).count(); self.insert_shreds(all_shreds, None, false)?; - Ok(num_shreds) + Ok(num_data) } pub fn get_index(&self, slot: Slot) -> Result> { @@ -3381,8 +3378,7 @@ pub fn create_new_ledger( let last_hash = entries.last().unwrap().hash; let version = solana_sdk::shred_version::version_from_hash(&last_hash); - let shredder = Shredder::new(0, 0, 0.0, Arc::new(Keypair::new()), 0, version) - .expect("Failed to create entry shredder"); + let shredder = Shredder::new(0, 0, Arc::new(Keypair::new()), 0, version).unwrap(); let shreds = shredder.entries_to_shreds(&entries, true, 0).0; assert!(shreds.last().unwrap().last_in_slot()); @@ -3558,10 +3554,10 @@ pub fn entries_to_test_shreds( is_full_slot: bool, version: u16, ) -> Vec { - let shredder = Shredder::new(slot, parent_slot, 0.0, Arc::new(Keypair::new()), 0, version) - .expect("Failed to create entry shredder"); - - shredder.entries_to_shreds(&entries, is_full_slot, 0).0 + Shredder::new(slot, parent_slot, Arc::new(Keypair::new()), 0, version) + .unwrap() + .entries_to_shreds(&entries, is_full_slot, 0) + .0 } // used for tests only @@ -7480,7 +7476,7 @@ pub mod tests { fn test_recovery() { let slot = 1; let (data_shreds, coding_shreds, leader_schedule_cache) = - setup_erasure_shreds(slot, 0, 100, 1.0); + setup_erasure_shreds(slot, 0, 100); let blockstore_path = get_tmp_ledger_path!(); { let blockstore = Blockstore::open(&blockstore_path).unwrap(); @@ -7513,7 +7509,7 @@ pub mod tests { let slot = 1; let num_entries = 100; let (data_shreds, coding_shreds, leader_schedule_cache) = - setup_erasure_shreds(slot, 0, num_entries, 1.0); + setup_erasure_shreds(slot, 0, num_entries); assert!(data_shreds.len() > 3); assert!(coding_shreds.len() > 3); let blockstore_path = get_tmp_ledger_path!(); @@ -7650,19 +7646,10 @@ pub mod tests { slot: u64, parent_slot: u64, num_entries: u64, - erasure_rate: f32, ) -> (Vec, Vec, Arc) { let entries = make_slot_entries_with_transactions(num_entries); let leader_keypair = Arc::new(Keypair::new()); - let shredder = Shredder::new( - slot, - parent_slot, - erasure_rate, - leader_keypair.clone(), - 0, - 0, - ) - .expect("Failed in creating shredder"); + let shredder = Shredder::new(slot, parent_slot, leader_keypair.clone(), 0, 0).unwrap(); let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds(&entries, true, 0); let genesis_config = create_genesis_config(2).genesis_config; @@ -7714,8 +7701,7 @@ pub mod tests { let entries1 = make_slot_entries_with_transactions(1); let entries2 = make_slot_entries_with_transactions(1); let leader_keypair = Arc::new(Keypair::new()); - let shredder = - Shredder::new(slot, 0, 1.0, leader_keypair, 0, 0).expect("Failed in creating shredder"); + let shredder = Shredder::new(slot, 0, leader_keypair, 0, 0).unwrap(); let (shreds, _, _) = shredder.entries_to_shreds(&entries1, true, 0); let (duplicate_shreds, _, _) = shredder.entries_to_shreds(&entries2, true, 0); let shred = shreds[0].clone(); @@ -8026,8 +8012,8 @@ pub mod tests { let ledger_path = get_tmp_ledger_path!(); let ledger = Blockstore::open(&ledger_path).unwrap(); - let coding1 = Shredder::generate_coding_shreds(0.5f32, &shreds, usize::MAX); - let coding2 = Shredder::generate_coding_shreds(1.0f32, &shreds, usize::MAX); + let coding1 = Shredder::generate_coding_shreds(&shreds, false); + let coding2 = Shredder::generate_coding_shreds(&shreds, true); for shred in &shreds { info!("shred {:?}", shred); } @@ -8051,7 +8037,7 @@ pub mod tests { solana_logger::setup(); let slot = 1; let (_data_shreds, mut coding_shreds, leader_schedule_cache) = - setup_erasure_shreds(slot, 0, 100, 1.0); + setup_erasure_shreds(slot, 0, 100); let blockstore_path = get_tmp_ledger_path!(); { let blockstore = Blockstore::open(&blockstore_path).unwrap(); diff --git a/ledger/src/erasure.rs b/ledger/src/erasure.rs index a1322724c3..455fdbcf64 100644 --- a/ledger/src/erasure.rs +++ b/ledger/src/erasure.rs @@ -104,10 +104,12 @@ impl Session { } /// Create coding blocks by overwriting `parity` - pub fn encode(&self, data: &[&[u8]], parity: &mut [&mut [u8]]) -> Result<()> { - self.0.encode_sep(data, parity)?; - - Ok(()) + pub fn encode(&self, data: &[T], parity: &mut [U]) -> Result<()> + where + T: AsRef<[u8]>, + U: AsRef<[u8]> + AsMut<[u8]>, + { + self.0.encode_sep(data, parity) } /// Recover data + coding blocks into data blocks diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index b2e879ffc5..b7700a8e8e 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -141,7 +141,6 @@ pub const DATA_SHRED: u8 = 0b1010_0101; pub const CODING_SHRED: u8 = 0b0101_1010; pub const MAX_DATA_SHREDS_PER_FEC_BLOCK: u32 = 32; -pub const RECOMMENDED_FEC_RATE: f32 = 1.0; pub const SHRED_TICK_REFERENCE_MASK: u8 = 0b0011_1111; const LAST_SHRED_IN_SLOT: u8 = 0b1000_0000; @@ -568,7 +567,6 @@ pub struct Shredder { pub slot: Slot, pub parent_slot: Slot, version: u16, - fec_rate: f32, keypair: Arc, pub signing_coding_time: u128, reference_tick: u8, @@ -578,21 +576,16 @@ impl Shredder { pub fn new( slot: Slot, parent_slot: Slot, - fec_rate: f32, keypair: Arc, reference_tick: u8, version: u16, ) -> Result { - #[allow(clippy::manual_range_contains)] - if fec_rate > 1.0 || fec_rate < 0.0 { - Err(ShredError::InvalidFecRate(fec_rate)) - } else if slot < parent_slot || slot - parent_slot > u64::from(std::u16::MAX) { + if slot < parent_slot || slot - parent_slot > u64::from(std::u16::MAX) { Err(ShredError::SlotTooLow { slot, parent_slot }) } else { Ok(Self { slot, parent_slot, - fec_rate, keypair, signing_coding_time: 0, reference_tick, @@ -618,7 +611,7 @@ impl Shredder { let coding_shreds = Self::data_shreds_to_coding_shreds( self.keypair.deref(), &data_shreds, - self.fec_rate, + is_last_in_slot, &mut stats, ) .unwrap(); @@ -697,12 +690,9 @@ impl Shredder { pub fn data_shreds_to_coding_shreds( keypair: &Keypair, data_shreds: &[Shred], - fec_rate: f32, + is_last_in_slot: bool, process_stats: &mut ProcessShredsStats, ) -> Result> { - if !(0.0..=1.0).contains(&fec_rate) { - return Err(ShredError::InvalidFecRate(fec_rate)); - } if data_shreds.is_empty() { return Ok(Vec::default()); } @@ -713,11 +703,7 @@ impl Shredder { data_shreds .par_chunks(MAX_DATA_SHREDS_PER_FEC_BLOCK as usize) .flat_map(|shred_data_batch| { - Shredder::generate_coding_shreds( - fec_rate, - shred_data_batch, - shred_data_batch.len(), // max_coding_shreds - ) + Shredder::generate_coding_shreds(shred_data_batch, is_last_in_slot) }) .collect() }) @@ -775,100 +761,53 @@ impl Shredder { } /// Generates coding shreds for the data shreds in the current FEC set - pub fn generate_coding_shreds( - fec_rate: f32, - data_shred_batch: &[Shred], - max_coding_shreds: usize, - ) -> Vec { - assert!(!data_shred_batch.is_empty()); - if fec_rate != 0.0 { - let num_data = data_shred_batch.len(); - // always generate at least 1 coding shred even if the fec_rate doesn't allow it - let num_coding = - Self::calculate_num_coding_shreds(num_data, fec_rate, max_coding_shreds); - let session = - Session::new(num_data, num_coding).expect("Failed to create erasure session"); - let ShredCommonHeader { - slot, - index: start_index, - version, - fec_set_index, - .. - } = data_shred_batch[0].common_header; - assert_eq!(fec_set_index, start_index); - assert!(data_shred_batch - .iter() - .all(|shred| shred.common_header.slot == slot - && shred.common_header.version == version - && shred.common_header.fec_set_index == fec_set_index)); - // All information (excluding the restricted section) from a data shred is encoded - let valid_data_len = SHRED_PAYLOAD_SIZE - SIZE_OF_CODING_SHRED_HEADERS; - let data_ptrs: Vec<_> = data_shred_batch - .iter() - .map(|data| &data.payload[..valid_data_len]) - .collect(); - - // Create empty coding shreds, with correctly populated headers - let mut coding_shreds: Vec<_> = (0..num_coding) - .map(|i| { - Shred::new_empty_coding( - slot, - start_index + i as u32, - fec_set_index, - num_data, - num_coding, - i, // position - version, - ) - .payload - }) - .collect(); - - // Grab pointers for the coding blocks; these come after the two headers - let coding_block_offset = SIZE_OF_CODING_SHRED_HEADERS; - let mut coding_ptrs: Vec<_> = coding_shreds - .iter_mut() - .map(|buffer| &mut buffer[coding_block_offset..]) - .collect(); - - // Create coding blocks - session - .encode(&data_ptrs, coding_ptrs.as_mut_slice()) - .expect("Failed in erasure encode"); - - // append to the shred list - coding_shreds - .into_iter() - .enumerate() - .map(|(i, payload)| { - let mut shred = Shred::new_empty_coding( - slot, - start_index + i as u32, - start_index, - num_data, - num_coding, - i, - version, - ); - shred.payload = payload; - shred - }) - .collect() + pub fn generate_coding_shreds(data: &[Shred], is_last_in_slot: bool) -> Vec { + const PAYLOAD_ENCODE_SIZE: usize = SHRED_PAYLOAD_SIZE - SIZE_OF_CODING_SHRED_HEADERS; + let ShredCommonHeader { + slot, + index, + version, + fec_set_index, + .. + } = data.first().unwrap().common_header; + assert_eq!(fec_set_index, index); + assert!(data.iter().all(|shred| shred.common_header.slot == slot + && shred.common_header.version == version + && shred.common_header.fec_set_index == fec_set_index)); + let num_data = data.len(); + let num_coding = if is_last_in_slot { + (2 * MAX_DATA_SHREDS_PER_FEC_BLOCK as usize) + .saturating_sub(num_data) + .max(num_data) } else { - vec![] - } - } - - fn calculate_num_coding_shreds( - num_data_shreds: usize, - fec_rate: f32, - max_coding_shreds: usize, - ) -> usize { - if num_data_shreds == 0 { - 0 - } else { - max_coding_shreds.min(1.max((fec_rate * num_data_shreds as f32) as usize)) - } + num_data + }; + let data: Vec<_> = data + .iter() + .map(|shred| &shred.payload[..PAYLOAD_ENCODE_SIZE]) + .collect(); + let mut parity = vec![vec![0; PAYLOAD_ENCODE_SIZE]; num_coding]; + Session::new(num_data, num_coding) + .unwrap() + .encode(&data, &mut parity[..]) + .unwrap(); + parity + .iter() + .enumerate() + .map(|(i, parity)| { + let mut shred = Shred::new_empty_coding( + slot, + fec_set_index + i as u32, // shred index + fec_set_index, + num_data, + num_coding, + i, // position + version, + ); + shred.payload[SIZE_OF_CODING_SHRED_HEADERS..].copy_from_slice(parity); + shred + }) + .collect() } fn fill_in_missing_shreds( @@ -1187,8 +1126,12 @@ pub mod tests { use super::*; use bincode::serialized_size; use matches::assert_matches; - use solana_sdk::{hash::hash, shred_version, system_transaction}; - use std::{collections::HashSet, convert::TryInto}; + use rand::{seq::SliceRandom, Rng}; + use solana_sdk::{ + hash::{self, hash}, + shred_version, system_transaction, + }; + use std::{collections::HashSet, convert::TryInto, iter::repeat_with}; #[test] fn test_shred_constants() { @@ -1243,7 +1186,7 @@ pub mod tests { // Test that parent cannot be > current slot assert_matches!( - Shredder::new(slot, slot + 1, 1.00, keypair.clone(), 0, 0), + Shredder::new(slot, slot + 1, keypair.clone(), 0, 0), Err(ShredError::SlotTooLow { slot: _, parent_slot: _, @@ -1251,18 +1194,14 @@ pub mod tests { ); // Test that slot - parent cannot be > u16 MAX assert_matches!( - Shredder::new(slot, slot - 1 - 0xffff, 1.00, keypair.clone(), 0, 0), + Shredder::new(slot, slot - 1 - 0xffff, keypair.clone(), 0, 0), Err(ShredError::SlotTooLow { slot: _, parent_slot: _, }) ); - - let fec_rate = 0.25; let parent_slot = slot - 5; - let shredder = Shredder::new(slot, parent_slot, fec_rate, keypair.clone(), 0, 0) - .expect("Failed in creating shredder"); - + let shredder = Shredder::new(slot, parent_slot, keypair.clone(), 0, 0).unwrap(); let entries: Vec<_> = (0..5) .map(|_| { let keypair0 = Keypair::new(); @@ -1277,12 +1216,9 @@ pub mod tests { // Integer division to ensure we have enough shreds to fit all the data let payload_capacity = SIZE_OF_DATA_SHRED_PAYLOAD as u64; let num_expected_data_shreds = (size + payload_capacity - 1) / payload_capacity; - let num_expected_coding_shreds = Shredder::calculate_num_coding_shreds( - num_expected_data_shreds as usize, - fec_rate, - num_expected_data_shreds as usize, - ); - + let num_expected_coding_shreds = (2 * MAX_DATA_SHREDS_PER_FEC_BLOCK as usize) + .saturating_sub(num_expected_data_shreds as usize) + .max(num_expected_data_shreds as usize); let start_index = 0; let (data_shreds, coding_shreds, next_index) = shredder.entries_to_shreds(&entries, true, start_index); @@ -1342,11 +1278,8 @@ pub mod tests { fn test_deserialize_shred_payload() { let keypair = Arc::new(Keypair::new()); let slot = 1; - let parent_slot = 0; - let shredder = Shredder::new(slot, parent_slot, 0.0, keypair, 0, 0) - .expect("Failed in creating shredder"); - + let shredder = Shredder::new(slot, parent_slot, keypair, 0, 0).unwrap(); let entries: Vec<_> = (0..5) .map(|_| { let keypair0 = Keypair::new(); @@ -1368,11 +1301,8 @@ pub mod tests { fn test_shred_reference_tick() { let keypair = Arc::new(Keypair::new()); let slot = 1; - let parent_slot = 0; - let shredder = Shredder::new(slot, parent_slot, 0.0, keypair, 5, 0) - .expect("Failed in creating shredder"); - + let shredder = Shredder::new(slot, parent_slot, keypair, 5, 0).unwrap(); let entries: Vec<_> = (0..5) .map(|_| { let keypair0 = Keypair::new(); @@ -1398,11 +1328,8 @@ pub mod tests { fn test_shred_reference_tick_overflow() { let keypair = Arc::new(Keypair::new()); let slot = 1; - let parent_slot = 0; - let shredder = Shredder::new(slot, parent_slot, 0.0, keypair, u8::max_value(), 0) - .expect("Failed in creating shredder"); - + let shredder = Shredder::new(slot, parent_slot, keypair, u8::max_value(), 0).unwrap(); let entries: Vec<_> = (0..5) .map(|_| { let keypair0 = Keypair::new(); @@ -1432,16 +1359,7 @@ pub mod tests { fn run_test_data_and_code_shredder(slot: Slot) { let keypair = Arc::new(Keypair::new()); - - // Test that FEC rate cannot be > 1.0 - assert_matches!( - Shredder::new(slot, slot - 5, 1.001, keypair.clone(), 0, 0), - Err(ShredError::InvalidFecRate(_)) - ); - - let shredder = Shredder::new(slot, slot - 5, 1.0, keypair.clone(), 0, 0) - .expect("Failed in creating shredder"); - + let shredder = Shredder::new(slot, slot - 5, keypair.clone(), 0, 0).unwrap(); // Create enough entries to make > 1 shred let payload_capacity = SIZE_OF_DATA_SHRED_PAYLOAD; let num_entries = max_ticks_per_n_shreds(1, Some(payload_capacity)) + 1; @@ -1480,11 +1398,9 @@ pub mod tests { run_test_data_and_code_shredder(0x1234_5678_9abc_def0); } - fn run_test_recovery_and_reassembly(slot: Slot) { + fn run_test_recovery_and_reassembly(slot: Slot, is_last_in_slot: bool) { let keypair = Arc::new(Keypair::new()); - let shredder = Shredder::new(slot, slot - 5, 1.0, keypair.clone(), 0, 0) - .expect("Failed in creating shredder"); - + let shredder = Shredder::new(slot, slot - 5, keypair.clone(), 0, 0).unwrap(); let keypair0 = Keypair::new(); let keypair1 = Keypair::new(); let tx0 = system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default()); @@ -1505,13 +1421,24 @@ pub mod tests { .collect(); let serialized_entries = bincode::serialize(&entries).unwrap(); - let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds(&entries, true, 0); + let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds( + &entries, + is_last_in_slot, + 0, // next_shred_index + ); let num_coding_shreds = coding_shreds.len(); // We should have 5 data shreds now assert_eq!(data_shreds.len(), num_data_shreds); - // and an equal number of coding shreds - assert_eq!(num_data_shreds, num_coding_shreds); + if is_last_in_slot { + assert_eq!( + num_coding_shreds, + 2 * MAX_DATA_SHREDS_PER_FEC_BLOCK as usize - num_data_shreds + ); + } else { + // and an equal number of coding shreds + assert_eq!(num_data_shreds, num_coding_shreds); + } let all_shreds = data_shreds .iter() @@ -1612,6 +1539,7 @@ pub mod tests { assert_eq!(recovered_data.len(), 3); // Data shreds 0, 2, 4 were missing for (i, recovered_shred) in recovered_data.into_iter().enumerate() { let index = i * 2; + let is_last_data = recovered_shred.index() as usize == num_data_shreds - 1; verify_test_data_shred( &recovered_shred, index.try_into().unwrap(), @@ -1619,8 +1547,8 @@ pub mod tests { slot - 5, &keypair.pubkey(), true, - recovered_shred.index() as usize == num_data_shreds - 1, - recovered_shred.index() as usize == num_data_shreds - 1, + is_last_data && is_last_in_slot, + is_last_data, ); shred_info.insert(i * 2, recovered_shred); @@ -1736,7 +1664,82 @@ pub mod tests { #[test] fn test_recovery_and_reassembly() { - run_test_recovery_and_reassembly(0x1234_5678_9abc_def0); + run_test_recovery_and_reassembly(0x1234_5678_9abc_def0, false); + run_test_recovery_and_reassembly(0x1234_5678_9abc_def0, true); + } + + fn run_recovery_with_expanded_coding_shreds(num_tx: usize, is_last_in_slot: bool) { + let mut rng = rand::thread_rng(); + let txs = repeat_with(|| { + system_transaction::transfer( + &Keypair::new(), // from + &Pubkey::new_unique(), // to + rng.gen(), // lamports + hash::new_rand(&mut rng), // recent block hash + ) + }) + .take(num_tx) + .collect(); + let entry = Entry::new( + &hash::new_rand(&mut rng), // prev hash + rng.gen_range(1, 64), // num hashes + txs, + ); + let keypair = Arc::new(Keypair::new()); + let slot = 71489660; + let shredder = Shredder::new( + slot, + slot - rng.gen_range(1, 27), // parent slot + keypair, + 0, // reference tick + rng.gen(), // version + ) + .unwrap(); + let next_shred_index = rng.gen_range(1, 1024); + let (data_shreds, coding_shreds, _) = + shredder.entries_to_shreds(&[entry], is_last_in_slot, next_shred_index); + let num_data_shreds = data_shreds.len(); + let num_coding_shreds = coding_shreds.len(); + let mut shreds = coding_shreds; + shreds.extend(data_shreds.iter().cloned()); + shreds.shuffle(&mut rng); + shreds.truncate(num_data_shreds); + shreds.sort_by_key(|shred| { + if shred.is_data() { + shred.index() + } else { + shred.index() + num_data_shreds as u32 + } + }); + let exclude: HashSet<_> = shreds + .iter() + .filter(|shred| shred.is_data()) + .map(|shred| shred.index()) + .collect(); + let recovered_shreds = Shredder::try_recovery( + shreds, + num_data_shreds, + num_coding_shreds, + next_shred_index as usize, // first index + next_shred_index as usize, // first code index + slot, + ) + .unwrap(); + assert_eq!( + recovered_shreds, + data_shreds + .into_iter() + .filter(|shred| !exclude.contains(&shred.index())) + .collect::>() + ); + } + + #[test] + fn test_recovery_with_expanded_coding_shreds() { + for num_tx in 0..100 { + run_recovery_with_expanded_coding_shreds(num_tx, false); + run_recovery_with_expanded_coding_shreds(num_tx, true); + } } #[test] @@ -1745,9 +1748,7 @@ pub mod tests { let hash = hash(Hash::default().as_ref()); let version = shred_version::version_from_hash(&hash); assert_ne!(version, 0); - let shredder = - Shredder::new(0, 0, 1.0, keypair, 0, version).expect("Failed in creating shredder"); - + let shredder = Shredder::new(0, 0, keypair, 0, version).unwrap(); let entries: Vec<_> = (0..5) .map(|_| { let keypair0 = Keypair::new(); @@ -1795,9 +1796,7 @@ pub mod tests { let hash = hash(Hash::default().as_ref()); let version = shred_version::version_from_hash(&hash); assert_ne!(version, 0); - let shredder = - Shredder::new(0, 0, 0.5, keypair, 0, version).expect("Failed in creating shredder"); - + let shredder = Shredder::new(0, 0, keypair, 0, version).unwrap(); let entries: Vec<_> = (0..500) .map(|_| { let keypair0 = Keypair::new(); @@ -1819,10 +1818,10 @@ pub mod tests { }); coding_shreds.iter().enumerate().for_each(|(i, s)| { - // There will be half the number of coding shreds, as FEC rate is 0.5 - // So multiply i with 2 - let expected_fec_set_index = - start_index + ((i * 2 / max_per_block) * max_per_block) as u32; + let mut expected_fec_set_index = start_index + (i - i % max_per_block) as u32; + while expected_fec_set_index as usize > data_shreds.len() { + expected_fec_set_index -= max_per_block as u32; + } assert_eq!(s.common_header.fec_set_index, expected_fec_set_index); }); } @@ -1833,9 +1832,7 @@ pub mod tests { let hash = hash(Hash::default().as_ref()); let version = shred_version::version_from_hash(&hash); assert_ne!(version, 0); - let shredder = - Shredder::new(0, 0, 1.0, keypair, 0, version).expect("Failed in creating shredder"); - + let shredder = Shredder::new(0, 0, keypair, 0, version).unwrap(); let entries: Vec<_> = (0..500) .map(|_| { let keypair0 = Keypair::new(); @@ -1862,17 +1859,28 @@ pub mod tests { let coding_shreds = Shredder::data_shreds_to_coding_shreds( shredder.keypair.deref(), &data_shreds[..count], - shredder.fec_rate, + false, // is_last_in_slot &mut stats, ) .unwrap(); assert_eq!(coding_shreds.len(), count); + let coding_shreds = Shredder::data_shreds_to_coding_shreds( + shredder.keypair.deref(), + &data_shreds[..count], + true, // is_last_in_slot + &mut stats, + ) + .unwrap(); + assert_eq!( + coding_shreds.len(), + 2 * MAX_DATA_SHREDS_PER_FEC_BLOCK as usize - count + ); }); let coding_shreds = Shredder::data_shreds_to_coding_shreds( shredder.keypair.deref(), &data_shreds[..MAX_DATA_SHREDS_PER_FEC_BLOCK as usize + 1], - shredder.fec_rate, + false, // is_last_in_slot &mut stats, ) .unwrap(); @@ -1880,6 +1888,17 @@ pub mod tests { coding_shreds.len(), MAX_DATA_SHREDS_PER_FEC_BLOCK as usize + 1 ); + let coding_shreds = Shredder::data_shreds_to_coding_shreds( + shredder.keypair.deref(), + &data_shreds[..MAX_DATA_SHREDS_PER_FEC_BLOCK as usize + 1], + true, // is_last_in_slot + &mut stats, + ) + .unwrap(); + assert_eq!( + coding_shreds.len(), + 3 * MAX_DATA_SHREDS_PER_FEC_BLOCK as usize - 1 + ); } #[test] diff --git a/ledger/tests/shred.rs b/ledger/tests/shred.rs index 80a720da12..f659f64348 100644 --- a/ledger/tests/shred.rs +++ b/ledger/tests/shred.rs @@ -22,9 +22,7 @@ type IndexShredsMap = BTreeMap>; fn test_multi_fec_block_coding() { let keypair = Arc::new(Keypair::new()); let slot = 0x1234_5678_9abc_def0; - let shredder = Shredder::new(slot, slot - 5, 1.0, keypair.clone(), 0, 0) - .expect("Failed in creating shredder"); - + let shredder = Shredder::new(slot, slot - 5, keypair.clone(), 0, 0).unwrap(); let num_fec_sets = 100; let num_data_shreds = (MAX_DATA_SHREDS_PER_FEC_BLOCK * num_fec_sets) as usize; let keypair0 = Keypair::new(); @@ -200,8 +198,7 @@ fn setup_different_sized_fec_blocks( parent_slot: Slot, keypair: Arc, ) -> (IndexShredsMap, IndexShredsMap, usize) { - let shredder = - Shredder::new(slot, parent_slot, 1.0, keypair, 0, 0).expect("Failed in creating shredder"); + let shredder = Shredder::new(slot, parent_slot, keypair, 0, 0).unwrap(); let keypair0 = Keypair::new(); let keypair1 = Keypair::new(); let tx0 = system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default());