From 439fd30840e79e1c8ea94e40a17f4b93cbedf330 Mon Sep 17 00:00:00 2001 From: carllin Date: Tue, 19 May 2020 16:13:12 -0700 Subject: [PATCH] Fix erasure (#10095) * Fix bad FEC blocks * Add test Co-authored-by: Carl --- core/src/shred_fetch_stage.rs | 9 +- ledger/src/shred.rs | 45 +++------ ledger/tests/shred.rs | 180 ++++++++++++++++++++++++++++++++-- 3 files changed, 189 insertions(+), 45 deletions(-) diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index 22a2499e00..570848834c 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -293,13 +293,8 @@ mod tests { ); assert!(!packet.meta.discard); - let coding = solana_ledger::shred::Shredder::generate_coding_shreds( - slot, - 1.0f32, - &[shred], - 10, - false, - ); + let coding = + solana_ledger::shred::Shredder::generate_coding_shreds(slot, 1.0f32, &[shred], 10); coding[0].copy_to_packet(&mut packet); ShredFetchStage::process_packet( &mut packet, diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 7bccf4fd4a..8a658ee5d3 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -20,8 +20,8 @@ use solana_sdk::{ pubkey::Pubkey, signature::{Keypair, Signature, Signer}, }; -use std::mem::size_of; -use std::{sync::Arc, time::Instant}; +use std::{mem::size_of, sync::Arc, time::Instant}; + use thiserror::Error; pub type Nonce = u32; @@ -169,7 +169,7 @@ impl Shred { index: u32, parent_offset: u16, data: Option<&[u8]>, - is_last_data: bool, + is_last_in_fec_set: bool, is_last_in_slot: bool, reference_tick: u8, version: u16, @@ -194,7 +194,7 @@ impl Shred { size, }; - if is_last_data { + if is_last_in_fec_set { data_header.flags |= DATA_COMPLETE_SHRED } @@ -496,7 +496,6 @@ impl Shredder { let no_header_size = SIZE_OF_DATA_SHRED_PAYLOAD; let num_shreds = (serialized_shreds.len() + no_header_size - 1) / no_header_size; let last_shred_index = next_shred_index + num_shreds as u32 - 1; - // 1) Generate data shreds let data_shreds: Vec = PAR_THREAD_POOL.with(|thread_pool| { thread_pool.borrow().install(|| { @@ -511,7 +510,7 @@ impl Shredder { let fec_set_index = shred_index - (i % MAX_DATA_SHREDS_PER_FEC_BLOCK as usize) as u32; - let (is_last_data, is_last_in_slot) = { + let (is_last_in_fec_set, is_last_in_slot) = { if shred_index == last_shred_index { (true, is_last_in_slot) } else { @@ -524,7 +523,7 @@ impl Shredder { shred_index, (self.slot - self.parent_slot) as u16, Some(shred_data), - is_last_data, + is_last_in_fec_set, is_last_in_slot, self.reference_tick, self.version, @@ -550,7 +549,6 @@ impl Shredder { pub fn data_shreds_to_coding_shreds(&self, data_shreds: &[Shred]) -> Vec { let now = Instant::now(); - let max_coding_shreds = data_shreds.len() > MAX_DATA_SHREDS_PER_FEC_BLOCK as usize; // 2) Generate coding shreds let mut coding_shreds: Vec<_> = PAR_THREAD_POOL.with(|thread_pool| { thread_pool.borrow().install(|| { @@ -562,7 +560,6 @@ impl Shredder { self.fec_rate, shred_data_batch, self.version, - max_coding_shreds, ) }) .collect() @@ -630,25 +627,12 @@ impl Shredder { fec_rate: f32, data_shred_batch: &[Shred], version: u16, - max_coding_shreds: bool, ) -> Vec { assert!(!data_shred_batch.is_empty()); if fec_rate != 0.0 { let num_data = data_shred_batch.len(); // always generate at least 1 coding shred even if the fec_rate doesn't allow it - let shred_count = if max_coding_shreds { - MAX_DATA_SHREDS_PER_FEC_BLOCK as usize - } else { - num_data - }; - let num_coding = Self::calculate_num_coding_shreds(shred_count as f32, fec_rate); - if num_coding > num_data { - trace!( - "Generated more codes ({}) than data shreds ({})", - num_coding, - num_data - ); - } + let num_coding = Self::calculate_num_coding_shreds(num_data, fec_rate); let session = Session::new(num_data, num_coding).expect("Failed to create erasure session"); let start_index = data_shred_batch[0].common_header.index; @@ -716,8 +700,12 @@ impl Shredder { } } - fn calculate_num_coding_shreds(num_data_shreds: f32, fec_rate: f32) -> usize { - 1.max((fec_rate * num_data_shreds) as usize) + fn calculate_num_coding_shreds(num_data_shreds: usize, fec_rate: f32) -> usize { + if num_data_shreds == 0 { + 0 + } else { + num_data_shreds.min(1.max((fec_rate * num_data_shreds as f32) as usize)) + } } fn fill_in_missing_shreds( @@ -971,8 +959,7 @@ pub mod tests { use bincode::serialized_size; use matches::assert_matches; use solana_sdk::{hash::hash, shred_version, system_transaction}; - use std::collections::HashSet; - use std::convert::TryInto; + use std::{collections::HashSet, convert::TryInto}; #[test] fn test_shred_constants() { @@ -1061,7 +1048,7 @@ pub mod tests { let no_header_size = SIZE_OF_DATA_SHRED_PAYLOAD as u64; let num_expected_data_shreds = (size + no_header_size - 1) / no_header_size; let num_expected_coding_shreds = - Shredder::calculate_num_coding_shreds(num_expected_data_shreds as f32, fec_rate); + Shredder::calculate_num_coding_shreds(num_expected_data_shreds as usize, fec_rate); let start_index = 0; let (data_shreds, coding_shreds, next_index) = @@ -1640,7 +1627,7 @@ pub mod tests { ); assert_eq!( coding_shreds.len(), - MAX_DATA_SHREDS_PER_FEC_BLOCK as usize * 2 + MAX_DATA_SHREDS_PER_FEC_BLOCK as usize + 1 ); } diff --git a/ledger/tests/shred.rs b/ledger/tests/shred.rs index 505f745f66..3077cc1879 100644 --- a/ledger/tests/shred.rs +++ b/ledger/tests/shred.rs @@ -3,13 +3,22 @@ use solana_ledger::shred::{ max_entries_per_n_shred, verify_test_data_shred, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK, SIZE_OF_DATA_SHRED_PAYLOAD, }; -use solana_sdk::signature::{Keypair, Signer}; -use solana_sdk::{clock::Slot, hash::Hash, system_transaction}; -use std::convert::TryInto; -use std::sync::Arc; +use solana_sdk::{ + clock::Slot, + hash::Hash, + signature::{Keypair, Signer}, + system_transaction, +}; +use std::{ + collections::{BTreeMap, HashSet}, + convert::TryInto, + sync::Arc, +}; -fn run_test_multi_fec_block_coding(slot: Slot) { +#[test] +fn test_multi_fec_block_coding() { let keypair = Arc::new(Keypair::new()); + let slot = 0x1234_5678_9abc_def0; let shredder = Shredder::new(slot, slot - 5, 1.0, keypair.clone(), 0, 0) .expect("Failed in creating shredder"); @@ -19,8 +28,11 @@ fn run_test_multi_fec_block_coding(slot: Slot) { let keypair1 = Keypair::new(); let tx0 = system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default()); let entry = Entry::new(&Hash::default(), 1, vec![tx0]); - let no_header_size = SIZE_OF_DATA_SHRED_PAYLOAD; - let num_entries = max_entries_per_n_shred(&entry, num_data_shreds as u64, Some(no_header_size)); + let num_entries = max_entries_per_n_shred( + &entry, + num_data_shreds as u64, + Some(SIZE_OF_DATA_SHRED_PAYLOAD), + ); let entries: Vec<_> = (0..num_entries) .map(|_| { @@ -96,6 +108,156 @@ fn run_test_multi_fec_block_coding(slot: Slot) { } #[test] -fn test_multi_fec_block_coding() { - run_test_multi_fec_block_coding(0x1234_5678_9abc_def0); +fn test_multi_fec_block_different_size_coding() { + let slot = 0x1234_5678_9abc_def0; + let parent_slot = slot - 5; + let keypair = Arc::new(Keypair::new()); + let (fec_data, fec_coding, num_shreds_per_iter) = + setup_different_sized_fec_blocks(slot, parent_slot, keypair.clone()); + + let total_num_data_shreds: usize = fec_data.values().map(|x| x.len()).sum(); + // Test recovery + for (fec_data_shreds, fec_coding_shreds) in fec_data.values().zip(fec_coding.values()) { + let first_data_index = fec_data_shreds.first().unwrap().index() as usize; + let first_code_index = fec_coding_shreds.first().unwrap().index() as usize; + let num_data = fec_data_shreds.len(); + let num_coding = fec_coding_shreds.len(); + let all_shreds: Vec = fec_data_shreds + .into_iter() + .step_by(2) + .chain(fec_coding_shreds.into_iter().step_by(2)) + .cloned() + .collect(); + + let recovered_data = Shredder::try_recovery( + all_shreds, + num_data, + num_coding, + first_data_index, + first_code_index, + slot, + ) + .unwrap(); + + // Necessary in order to ensure the last shred in the slot + // is part of the recovered set, and that the below `index` + // cacluation in the loop is correct + assert!(fec_data_shreds.len() % 2 == 0); + for (i, recovered_shred) in recovered_data.into_iter().enumerate() { + let index = first_data_index + (i * 2) + 1; + verify_test_data_shred( + &recovered_shred, + index.try_into().unwrap(), + slot, + parent_slot, + &keypair.pubkey(), + true, + index == total_num_data_shreds - 1, + index % num_shreds_per_iter == num_shreds_per_iter - 1, + ); + } + } +} + +fn sort_data_coding_into_fec_sets( + data_shreds: Vec, + coding_shreds: Vec, + fec_data: &mut BTreeMap>, + fec_coding: &mut BTreeMap>, + data_slot_and_index: &mut HashSet<(Slot, u32)>, + coding_slot_and_index: &mut HashSet<(Slot, u32)>, +) { + for shred in data_shreds { + assert!(shred.is_data()); + let key = (shred.slot(), shred.index()); + // Make sure there are no duplicates for same key + assert!(!data_slot_and_index.contains(&key)); + data_slot_and_index.insert(key); + let fec_entry = fec_data + .entry(shred.common_header.fec_set_index) + .or_insert(vec![]); + fec_entry.push(shred); + } + for shred in coding_shreds { + assert!(!shred.is_data()); + let key = (shred.slot(), shred.index()); + // Make sure there are no duplicates for same key + assert!(!coding_slot_and_index.contains(&key)); + coding_slot_and_index.insert(key); + let fec_entry = fec_coding + .entry(shred.common_header.fec_set_index) + .or_insert(vec![]); + fec_entry.push(shred); + } +} + +fn setup_different_sized_fec_blocks( + slot: Slot, + parent_slot: Slot, + keypair: Arc, +) -> (BTreeMap>, BTreeMap>, usize) { + let shredder = + Shredder::new(slot, parent_slot, 1.0, keypair, 0, 0).expect("Failed in creating shredder"); + let keypair0 = Keypair::new(); + let keypair1 = Keypair::new(); + let tx0 = system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default()); + let entry = Entry::new(&Hash::default(), 1, vec![tx0]); + + // Make enough entries for `MAX_DATA_SHREDS_PER_FEC_BLOCK + 2` shreds so one + // fec set will have `MAX_DATA_SHREDS_PER_FEC_BLOCK` shreds and the next + // will have 2 shreds. + assert!(MAX_DATA_SHREDS_PER_FEC_BLOCK > 2); + let num_shreds_per_iter = MAX_DATA_SHREDS_PER_FEC_BLOCK as usize + 2; + let num_entries = max_entries_per_n_shred( + &entry, + num_shreds_per_iter as u64, + Some(SIZE_OF_DATA_SHRED_PAYLOAD), + ); + let entries: Vec<_> = (0..num_entries) + .map(|_| { + let keypair0 = Keypair::new(); + let keypair1 = Keypair::new(); + let tx0 = + system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default()); + Entry::new(&Hash::default(), 1, vec![tx0]) + }) + .collect(); + + // Run the shredder twice, generate data and coding shreds + let mut next_index = 0; + let mut fec_data = BTreeMap::new(); + let mut fec_coding = BTreeMap::new(); + let mut data_slot_and_index = HashSet::new(); + let mut coding_slot_and_index = HashSet::new(); + + let total_num_data_shreds: usize = 2 * num_shreds_per_iter; + for i in 0..2 { + let is_last = i == 1; + let (data_shreds, coding_shreds, new_next_index) = + shredder.entries_to_shreds(&entries, is_last, next_index); + for shred in &data_shreds { + if (shred.index() as usize) == total_num_data_shreds - 1 { + assert!(shred.data_complete()); + assert!(shred.last_in_slot()); + } else if (shred.index() as usize) % num_shreds_per_iter == num_shreds_per_iter - 1 { + assert!(shred.data_complete()); + } else { + assert!(!shred.data_complete()); + assert!(!shred.last_in_slot()); + } + } + assert_eq!(data_shreds.len(), num_shreds_per_iter as usize); + next_index = new_next_index; + sort_data_coding_into_fec_sets( + data_shreds, + coding_shreds, + &mut fec_data, + &mut fec_coding, + &mut data_slot_and_index, + &mut coding_slot_and_index, + ); + } + + assert_eq!(fec_data.len(), fec_coding.len()); + (fec_data, fec_coding, num_shreds_per_iter) }