Generate max coding shreds when necessary (#8099)
* Generate max coding shreds when necessary * test
This commit is contained in:
		@@ -5333,7 +5333,7 @@ pub mod tests {
 | 
			
		||||
                .iter()
 | 
			
		||||
                .cloned()
 | 
			
		||||
                .chain(
 | 
			
		||||
                    coding_shreds[coding_shreds.len() / 2 - 1..data_shreds.len() / 2]
 | 
			
		||||
                    coding_shreds[coding_shreds.len() / 2 - 1..coding_shreds.len() / 2]
 | 
			
		||||
                        .iter()
 | 
			
		||||
                        .cloned(),
 | 
			
		||||
                )
 | 
			
		||||
 
 | 
			
		||||
@@ -194,19 +194,17 @@ impl ErasureMeta {
 | 
			
		||||
            .data()
 | 
			
		||||
            .present_in_bounds(self.set_index..self.set_index + self.config.num_data() as u64);
 | 
			
		||||
 | 
			
		||||
        let (data_missing, coding_missing) = (
 | 
			
		||||
            self.config.num_data() - num_data,
 | 
			
		||||
            self.config.num_coding() - num_coding,
 | 
			
		||||
        let (data_missing, num_needed) = (
 | 
			
		||||
            self.config.num_data().saturating_sub(num_data),
 | 
			
		||||
            self.config.num_data().saturating_sub(num_data + num_coding),
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        let total_missing = data_missing + coding_missing;
 | 
			
		||||
 | 
			
		||||
        if data_missing > 0 && total_missing <= self.config.num_coding() {
 | 
			
		||||
            CanRecover
 | 
			
		||||
        } else if data_missing == 0 {
 | 
			
		||||
        if data_missing == 0 {
 | 
			
		||||
            DataFull
 | 
			
		||||
        } else if num_needed == 0 {
 | 
			
		||||
            CanRecover
 | 
			
		||||
        } else {
 | 
			
		||||
            StillNeed(total_missing - self.config.num_coding())
 | 
			
		||||
            StillNeed(num_needed)
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -529,6 +529,7 @@ impl Shredder {
 | 
			
		||||
 | 
			
		||||
    pub fn data_shreds_to_coding_shreds(&self, data_shreds: &[Shred]) -> Vec<Shred> {
 | 
			
		||||
        let now = Instant::now();
 | 
			
		||||
        let max_coding_shreds = data_shreds.len() > MAX_DATA_SHREDS_PER_FEC_BLOCK as usize;
 | 
			
		||||
        // 2) Generate coding shreds
 | 
			
		||||
        let mut coding_shreds: Vec<_> = PAR_THREAD_POOL.with(|thread_pool| {
 | 
			
		||||
            thread_pool.borrow().install(|| {
 | 
			
		||||
@@ -540,6 +541,7 @@ impl Shredder {
 | 
			
		||||
                            self.fec_rate,
 | 
			
		||||
                            shred_data_batch,
 | 
			
		||||
                            self.version,
 | 
			
		||||
                            max_coding_shreds,
 | 
			
		||||
                        )
 | 
			
		||||
                    })
 | 
			
		||||
                    .collect()
 | 
			
		||||
@@ -607,12 +609,25 @@ impl Shredder {
 | 
			
		||||
        fec_rate: f32,
 | 
			
		||||
        data_shred_batch: &[Shred],
 | 
			
		||||
        version: u16,
 | 
			
		||||
        max_coding_shreds: bool,
 | 
			
		||||
    ) -> Vec<Shred> {
 | 
			
		||||
        assert!(!data_shred_batch.is_empty());
 | 
			
		||||
        if fec_rate != 0.0 {
 | 
			
		||||
            let num_data = data_shred_batch.len();
 | 
			
		||||
            // always generate at least 1 coding shred even if the fec_rate doesn't allow it
 | 
			
		||||
            let num_coding = Self::calculate_num_coding_shreds(num_data as f32, fec_rate);
 | 
			
		||||
            let shred_count = if max_coding_shreds {
 | 
			
		||||
                MAX_DATA_SHREDS_PER_FEC_BLOCK as usize
 | 
			
		||||
            } else {
 | 
			
		||||
                num_data
 | 
			
		||||
            };
 | 
			
		||||
            let num_coding = Self::calculate_num_coding_shreds(shred_count as f32, fec_rate);
 | 
			
		||||
            if num_coding > num_data {
 | 
			
		||||
                trace!(
 | 
			
		||||
                    "Generated more codes ({}) than data shreds ({})",
 | 
			
		||||
                    num_coding,
 | 
			
		||||
                    num_data
 | 
			
		||||
                );
 | 
			
		||||
            }
 | 
			
		||||
            let session =
 | 
			
		||||
                Session::new(num_data, num_coding).expect("Failed to create erasure session");
 | 
			
		||||
            let start_index = data_shred_batch[0].common_header.index;
 | 
			
		||||
@@ -1162,9 +1177,6 @@ pub mod tests {
 | 
			
		||||
 | 
			
		||||
        let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds(&entries, true, 0);
 | 
			
		||||
 | 
			
		||||
        // Must have created an equal number of coding and data shreds
 | 
			
		||||
        assert_eq!(data_shreds.len(), coding_shreds.len());
 | 
			
		||||
 | 
			
		||||
        for (i, s) in data_shreds.iter().enumerate() {
 | 
			
		||||
            verify_test_data_shred(
 | 
			
		||||
                s,
 | 
			
		||||
@@ -1209,10 +1221,10 @@ pub mod tests {
 | 
			
		||||
 | 
			
		||||
        let serialized_entries = bincode::serialize(&entries).unwrap();
 | 
			
		||||
        let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds(&entries, true, 0);
 | 
			
		||||
        let num_coding_shreds = coding_shreds.len();
 | 
			
		||||
 | 
			
		||||
        // We should have 10 shreds now, an equal number of coding shreds
 | 
			
		||||
        assert_eq!(data_shreds.len(), num_data_shreds);
 | 
			
		||||
        assert_eq!(coding_shreds.len(), num_data_shreds);
 | 
			
		||||
 | 
			
		||||
        let all_shreds = data_shreds
 | 
			
		||||
            .iter()
 | 
			
		||||
@@ -1225,7 +1237,7 @@ pub mod tests {
 | 
			
		||||
            Shredder::try_recovery(
 | 
			
		||||
                data_shreds[..data_shreds.len() - 1].to_vec(),
 | 
			
		||||
                num_data_shreds,
 | 
			
		||||
                num_data_shreds,
 | 
			
		||||
                num_coding_shreds,
 | 
			
		||||
                0,
 | 
			
		||||
                0,
 | 
			
		||||
                slot
 | 
			
		||||
@@ -1237,7 +1249,7 @@ pub mod tests {
 | 
			
		||||
        let recovered_data = Shredder::try_recovery(
 | 
			
		||||
            data_shreds[..].to_vec(),
 | 
			
		||||
            num_data_shreds,
 | 
			
		||||
            num_data_shreds,
 | 
			
		||||
            num_coding_shreds,
 | 
			
		||||
            0,
 | 
			
		||||
            0,
 | 
			
		||||
            slot,
 | 
			
		||||
@@ -1255,7 +1267,7 @@ pub mod tests {
 | 
			
		||||
        let mut recovered_data = Shredder::try_recovery(
 | 
			
		||||
            shred_info.clone(),
 | 
			
		||||
            num_data_shreds,
 | 
			
		||||
            num_data_shreds,
 | 
			
		||||
            num_coding_shreds,
 | 
			
		||||
            0,
 | 
			
		||||
            0,
 | 
			
		||||
            slot,
 | 
			
		||||
@@ -1303,7 +1315,7 @@ pub mod tests {
 | 
			
		||||
        let recovered_data = Shredder::try_recovery(
 | 
			
		||||
            shred_info.clone(),
 | 
			
		||||
            num_data_shreds,
 | 
			
		||||
            num_data_shreds,
 | 
			
		||||
            num_coding_shreds,
 | 
			
		||||
            0,
 | 
			
		||||
            0,
 | 
			
		||||
            slot,
 | 
			
		||||
@@ -1356,10 +1368,9 @@ pub mod tests {
 | 
			
		||||
        // and 2 missing coding shreds. Hint: should work
 | 
			
		||||
        let serialized_entries = bincode::serialize(&entries).unwrap();
 | 
			
		||||
        let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds(&entries, true, 25);
 | 
			
		||||
 | 
			
		||||
        // We should have 10 shreds now, an equal number of coding shreds
 | 
			
		||||
        let num_coding_shreds = coding_shreds.len();
 | 
			
		||||
        // We should have 10 shreds now
 | 
			
		||||
        assert_eq!(data_shreds.len(), num_data_shreds);
 | 
			
		||||
        assert_eq!(coding_shreds.len(), num_data_shreds);
 | 
			
		||||
 | 
			
		||||
        let all_shreds = data_shreds
 | 
			
		||||
            .iter()
 | 
			
		||||
@@ -1376,7 +1387,7 @@ pub mod tests {
 | 
			
		||||
        let recovered_data = Shredder::try_recovery(
 | 
			
		||||
            shred_info.clone(),
 | 
			
		||||
            num_data_shreds,
 | 
			
		||||
            num_data_shreds,
 | 
			
		||||
            num_coding_shreds,
 | 
			
		||||
            25,
 | 
			
		||||
            25,
 | 
			
		||||
            slot,
 | 
			
		||||
@@ -1408,7 +1419,7 @@ pub mod tests {
 | 
			
		||||
        let recovered_data = Shredder::try_recovery(
 | 
			
		||||
            shred_info.clone(),
 | 
			
		||||
            num_data_shreds,
 | 
			
		||||
            num_data_shreds,
 | 
			
		||||
            num_coding_shreds,
 | 
			
		||||
            25,
 | 
			
		||||
            25,
 | 
			
		||||
            slot + 1,
 | 
			
		||||
@@ -1421,7 +1432,7 @@ pub mod tests {
 | 
			
		||||
            Shredder::try_recovery(
 | 
			
		||||
                shred_info.clone(),
 | 
			
		||||
                num_data_shreds,
 | 
			
		||||
                num_data_shreds,
 | 
			
		||||
                num_coding_shreds,
 | 
			
		||||
                15,
 | 
			
		||||
                15,
 | 
			
		||||
                slot,
 | 
			
		||||
@@ -1431,7 +1442,7 @@ pub mod tests {
 | 
			
		||||
 | 
			
		||||
        // Test8: Try recovery/reassembly with incorrect index. Hint: does not recover any shreds
 | 
			
		||||
        assert_matches!(
 | 
			
		||||
            Shredder::try_recovery(shred_info, num_data_shreds, num_data_shreds, 35, 35, slot,),
 | 
			
		||||
            Shredder::try_recovery(shred_info, num_data_shreds, num_coding_shreds, 35, 35, slot,),
 | 
			
		||||
            Err(reed_solomon_erasure::Error::TooFewShardsPresent)
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
@@ -1523,4 +1534,45 @@ pub mod tests {
 | 
			
		||||
            assert_eq!(s.common_header.fec_set_index, expected_fec_set_index);
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn test_max_coding_shreds() {
 | 
			
		||||
        let keypair = Arc::new(Keypair::new());
 | 
			
		||||
        let hash = hash(Hash::default().as_ref());
 | 
			
		||||
        let version = Shred::version_from_hash(&hash);
 | 
			
		||||
        assert_ne!(version, 0);
 | 
			
		||||
        let shredder =
 | 
			
		||||
            Shredder::new(0, 0, 1.0, keypair, 0, version).expect("Failed in creating shredder");
 | 
			
		||||
 | 
			
		||||
        let entries: Vec<_> = (0..500)
 | 
			
		||||
            .map(|_| {
 | 
			
		||||
                let keypair0 = Keypair::new();
 | 
			
		||||
                let keypair1 = Keypair::new();
 | 
			
		||||
                let tx0 =
 | 
			
		||||
                    system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default());
 | 
			
		||||
                Entry::new(&Hash::default(), 1, vec![tx0])
 | 
			
		||||
            })
 | 
			
		||||
            .collect();
 | 
			
		||||
 | 
			
		||||
        let start_index = 0x12;
 | 
			
		||||
        let (data_shreds, _next_index) =
 | 
			
		||||
            shredder.entries_to_data_shreds(&entries, true, start_index);
 | 
			
		||||
 | 
			
		||||
        assert!(data_shreds.len() > MAX_DATA_SHREDS_PER_FEC_BLOCK as usize);
 | 
			
		||||
 | 
			
		||||
        (1..=MAX_DATA_SHREDS_PER_FEC_BLOCK as usize)
 | 
			
		||||
            .into_iter()
 | 
			
		||||
            .for_each(|count| {
 | 
			
		||||
                let coding_shreds = shredder.data_shreds_to_coding_shreds(&data_shreds[..count]);
 | 
			
		||||
                assert_eq!(coding_shreds.len(), count);
 | 
			
		||||
            });
 | 
			
		||||
 | 
			
		||||
        let coding_shreds = shredder.data_shreds_to_coding_shreds(
 | 
			
		||||
            &data_shreds[..MAX_DATA_SHREDS_PER_FEC_BLOCK as usize + 1],
 | 
			
		||||
        );
 | 
			
		||||
        assert_eq!(
 | 
			
		||||
            coding_shreds.len(),
 | 
			
		||||
            MAX_DATA_SHREDS_PER_FEC_BLOCK as usize * 2
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user