Integrate coding shreds and recovery (#5625)
* Integrate coding shreds and recovery * More tests for shreds and some fixes * address review comments * fixes to code shred generation * unignore tests * fixes to recovery
This commit is contained in:
		| @@ -1,7 +1,6 @@ | ||||
| //! The `block_tree` module provides functions for parallel verification of the | ||||
| //! Proof of History ledger as well as iterative read, append write, and random | ||||
| //! access read to a persistent file-based ledger. | ||||
| use crate::broadcast_stage::broadcast_utils::entries_to_shreds; | ||||
| use crate::entry::Entry; | ||||
| use crate::erasure::{ErasureConfig, Session}; | ||||
| use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE}; | ||||
| @@ -37,6 +36,7 @@ use std::sync::{Arc, RwLock}; | ||||
| pub use self::meta::*; | ||||
| pub use self::rooted_slot_iterator::*; | ||||
| use solana_sdk::timing::Slot; | ||||
| use std::io::Write; | ||||
|  | ||||
| mod db; | ||||
| mod meta; | ||||
| @@ -94,7 +94,7 @@ pub struct Blocktree { | ||||
|     orphans_cf: LedgerColumn<cf::Orphans>, | ||||
|     index_cf: LedgerColumn<cf::Index>, | ||||
|     data_shred_cf: LedgerColumn<cf::ShredData>, | ||||
|     _code_shred_cf: LedgerColumn<cf::ShredCode>, | ||||
|     code_shred_cf: LedgerColumn<cf::ShredCode>, | ||||
|     batch_processor: Arc<RwLock<BatchProcessor>>, | ||||
|     pub new_blobs_signals: Vec<SyncSender<bool>>, | ||||
|     pub completed_slots_senders: Vec<SyncSender<Vec<u64>>>, | ||||
| @@ -166,7 +166,7 @@ impl Blocktree { | ||||
|             orphans_cf, | ||||
|             index_cf, | ||||
|             data_shred_cf, | ||||
|             _code_shred_cf: code_shred_cf, | ||||
|             code_shred_cf, | ||||
|             new_blobs_signals: vec![], | ||||
|             batch_processor, | ||||
|             completed_slots_senders: vec![], | ||||
| @@ -258,6 +258,14 @@ impl Blocktree { | ||||
|                     .erasure_cf | ||||
|                     .delete_slot(&mut write_batch, from_slot, batch_end) | ||||
|                     .unwrap_or(false) | ||||
|                 && self | ||||
|                     .data_shred_cf | ||||
|                     .delete_slot(&mut write_batch, from_slot, batch_end) | ||||
|                     .unwrap_or(false) | ||||
|                 && self | ||||
|                     .code_shred_cf | ||||
|                     .delete_slot(&mut write_batch, from_slot, batch_end) | ||||
|                     .unwrap_or(false) | ||||
|                 && self | ||||
|                     .orphans_cf | ||||
|                     .delete_slot(&mut write_batch, from_slot, batch_end) | ||||
| @@ -318,47 +326,145 @@ impl Blocktree { | ||||
|         Ok(slot_iterator.take_while(move |((blob_slot, _), _)| *blob_slot == slot)) | ||||
|     } | ||||
|  | ||||
|     pub fn insert_shreds(&self, shreds: &[Shred]) -> Result<()> { | ||||
|     fn try_shred_recovery( | ||||
|         db: &Database, | ||||
|         erasure_metas: &HashMap<(u64, u64), ErasureMeta>, | ||||
|         index_working_set: &HashMap<u64, Index>, | ||||
|         prev_inserted_datas: &mut HashMap<(u64, u64), Shred>, | ||||
|         prev_inserted_codes: &mut HashMap<(u64, u64), Shred>, | ||||
|     ) -> Vec<Shred> { | ||||
|         let data_cf = db.column::<cf::ShredData>(); | ||||
|         let code_cf = db.column::<cf::ShredCode>(); | ||||
|         let mut recovered_data_shreds = vec![]; | ||||
|         // Recovery rules: | ||||
|         // 1. Only try recovery around indexes for which new data or coding shreds are received | ||||
|         // 2. For new data shreds, check if an erasure set exists. If not, don't try recovery | ||||
|         // 3. Before trying recovery, check if enough number of shreds have been received | ||||
|         // 3a. Enough number of shreds = (#data + #coding shreds) > erasure.num_data | ||||
|         for (&(slot, set_index), erasure_meta) in erasure_metas.iter() { | ||||
|             let index = index_working_set.get(&slot).expect("Index"); | ||||
|             if let ErasureMetaStatus::CanRecover = erasure_meta.status(&index) { | ||||
|                 // Find shreds for this erasure set and try recovery | ||||
|                 let slot = index.slot; | ||||
|                 let mut available_shreds = vec![]; | ||||
|                 (set_index..set_index + erasure_meta.config.num_data() as u64).for_each(|i| { | ||||
|                     if index.data().is_present(i) { | ||||
|                         if let Some(shred) = prev_inserted_datas.remove(&(slot, i)).or_else(|| { | ||||
|                             let some_data = data_cf | ||||
|                                 .get_bytes((slot, i)) | ||||
|                                 .expect("Database failure, could not fetch data shred"); | ||||
|                             if let Some(data) = some_data { | ||||
|                                 bincode::deserialize(&data).ok() | ||||
|                             } else { | ||||
|                                 warn!("Data shred deleted while reading for recovery"); | ||||
|                                 None | ||||
|                             } | ||||
|                         }) { | ||||
|                             available_shreds.push(shred); | ||||
|                         } | ||||
|                     } | ||||
|                 }); | ||||
|                 (set_index..set_index + erasure_meta.config.num_coding() as u64).for_each(|i| { | ||||
|                     if index.coding().is_present(i) { | ||||
|                         if let Some(shred) = prev_inserted_codes.remove(&(slot, i)).or_else(|| { | ||||
|                             let some_code = code_cf | ||||
|                                 .get_bytes((slot, i)) | ||||
|                                 .expect("Database failure, could not fetch code shred"); | ||||
|                             if let Some(code) = some_code { | ||||
|                                 bincode::deserialize(&code).ok() | ||||
|                             } else { | ||||
|                                 warn!("Code shred deleted while reading for recovery"); | ||||
|                                 None | ||||
|                             } | ||||
|                         }) { | ||||
|                             available_shreds.push(shred); | ||||
|                         } | ||||
|                     } | ||||
|                 }); | ||||
|                 if let Ok(mut result) = Shredder::try_recovery( | ||||
|                     &available_shreds, | ||||
|                     erasure_meta.config.num_data(), | ||||
|                     erasure_meta.config.num_coding(), | ||||
|                     set_index as usize, | ||||
|                     slot, | ||||
|                 ) { | ||||
|                     recovered_data_shreds.append(&mut result.recovered_data); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|         recovered_data_shreds | ||||
|     } | ||||
|  | ||||
|     pub fn insert_shreds(&self, shreds: Vec<Shred>) -> Result<()> { | ||||
|         let db = &*self.db; | ||||
|         let mut batch_processor = self.batch_processor.write().unwrap(); | ||||
|         let mut write_batch = batch_processor.batch()?; | ||||
|  | ||||
|         let mut just_inserted_data_indexes = HashMap::new(); | ||||
|         let mut just_inserted_data_shreds = HashMap::new(); | ||||
|         let mut just_inserted_coding_shreds = HashMap::new(); | ||||
|         let mut erasure_metas = HashMap::new(); | ||||
|         let mut slot_meta_working_set = HashMap::new(); | ||||
|         let mut index_working_set = HashMap::new(); | ||||
|  | ||||
|         shreds.iter().for_each(|shred| { | ||||
|         shreds.into_iter().for_each(|shred| { | ||||
|             let slot = shred.slot(); | ||||
|             let shred_index = u64::from(shred.index()); | ||||
|  | ||||
|             let _ = index_working_set.entry(slot).or_insert_with(|| { | ||||
|             let index_meta = index_working_set.entry(slot).or_insert_with(|| { | ||||
|                 self.index_cf | ||||
|                     .get(slot) | ||||
|                     .unwrap() | ||||
|                     .unwrap_or_else(|| Index::new(slot)) | ||||
|             }); | ||||
|         }); | ||||
|  | ||||
|         // Possibly do erasure recovery here | ||||
|             if let Shred::Coding(coding_shred) = &shred { | ||||
|                 // This gives the index of first coding shred in this FEC block | ||||
|                 // So, all coding shreds in a given FEC block will have the same set index | ||||
|                 let pos = u64::from(coding_shred.header.position); | ||||
|                 if shred_index >= pos { | ||||
|                     let set_index = shred_index - pos; | ||||
|  | ||||
|         let dummy_data = vec![]; | ||||
|  | ||||
|         for shred in shreds { | ||||
|             let slot = shred.slot(); | ||||
|             let index = u64::from(shred.index()); | ||||
|  | ||||
|             let inserted = Blocktree::insert_data_shred( | ||||
|                     self.insert_coding_shred( | ||||
|                         set_index, | ||||
|                         coding_shred.header.num_data_shreds as usize, | ||||
|                         coding_shred.header.num_coding_shreds as usize, | ||||
|                         &mut just_inserted_coding_shreds, | ||||
|                         &mut erasure_metas, | ||||
|                         index_meta, | ||||
|                         shred, | ||||
|                         &mut write_batch, | ||||
|                     ) | ||||
|                 } | ||||
|             } else if Blocktree::insert_data_shred( | ||||
|                 db, | ||||
|                 &just_inserted_data_indexes, | ||||
|                 &mut slot_meta_working_set, | ||||
|                 &mut index_working_set, | ||||
|                 shred, | ||||
|                 &shred, | ||||
|                 &mut write_batch, | ||||
|             )?; | ||||
|  | ||||
|             if inserted { | ||||
|                 just_inserted_data_indexes.insert((slot, index), &dummy_data); | ||||
|             ) | ||||
|             .unwrap_or(false) | ||||
|             { | ||||
|                 just_inserted_data_shreds.insert((slot, shred_index), shred); | ||||
|             } | ||||
|         } | ||||
|         }); | ||||
|  | ||||
|         let recovered_data = Self::try_shred_recovery( | ||||
|             &db, | ||||
|             &erasure_metas, | ||||
|             &index_working_set, | ||||
|             &mut just_inserted_data_shreds, | ||||
|             &mut just_inserted_coding_shreds, | ||||
|         ); | ||||
|  | ||||
|         recovered_data.into_iter().for_each(|shred| { | ||||
|             let _ = Blocktree::insert_data_shred( | ||||
|                 db, | ||||
|                 &mut slot_meta_working_set, | ||||
|                 &mut index_working_set, | ||||
|                 &shred, | ||||
|                 &mut write_batch, | ||||
|             ); | ||||
|         }); | ||||
|  | ||||
|         // Handle chaining for the working set | ||||
|         handle_chaining(&db, &mut write_batch, &slot_meta_working_set)?; | ||||
| @@ -391,9 +497,53 @@ impl Blocktree { | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     fn insert_coding_shred( | ||||
|         &self, | ||||
|         set_index: u64, | ||||
|         num_data: usize, | ||||
|         num_coding: usize, | ||||
|         prev_inserted_coding_shreds: &mut HashMap<(u64, u64), Shred>, | ||||
|         erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>, | ||||
|         index_meta: &mut Index, | ||||
|         shred: Shred, | ||||
|         write_batch: &mut WriteBatch, | ||||
|     ) { | ||||
|         let slot = shred.slot(); | ||||
|         let shred_index = u64::from(shred.index()); | ||||
|  | ||||
|         let erasure_config = ErasureConfig::new(num_data, num_coding); | ||||
|  | ||||
|         let erasure_meta = erasure_metas.entry((slot, set_index)).or_insert_with(|| { | ||||
|             self.erasure_meta_cf | ||||
|                 .get((slot, set_index)) | ||||
|                 .expect("Expect database get to succeed") | ||||
|                 .unwrap_or_else(|| ErasureMeta::new(set_index, &erasure_config)) | ||||
|         }); | ||||
|  | ||||
|         if erasure_config != erasure_meta.config { | ||||
|             // ToDo: This is a potential slashing condition | ||||
|             warn!("Received multiple erasure configs for the same erasure set!!!"); | ||||
|             warn!( | ||||
|                 "Stored config: {:#?}, new config: {:#?}", | ||||
|                 erasure_meta.config, erasure_config | ||||
|             ); | ||||
|         } | ||||
|  | ||||
|         let serialized_shred = bincode::serialize(&shred).unwrap(); | ||||
|         let inserted = | ||||
|             write_batch.put_bytes::<cf::ShredCode>((slot, shred_index), &serialized_shred); | ||||
|         if inserted.is_ok() { | ||||
|             index_meta.coding_mut().set_present(shred_index, true); | ||||
|  | ||||
|             // `or_insert_with` used to prevent stack overflow | ||||
|             prev_inserted_coding_shreds | ||||
|                 .entry((slot, shred_index)) | ||||
|                 .or_insert_with(|| shred); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn insert_data_shred( | ||||
|         db: &Database, | ||||
|         prev_inserted_data_indexes: &HashMap<(u64, u64), &[u8]>, | ||||
|         mut slot_meta_working_set: &mut HashMap<u64, (Rc<RefCell<SlotMeta>>, Option<SlotMeta>)>, | ||||
|         index_working_set: &mut HashMap<u64, Index>, | ||||
|         shred: &Shred, | ||||
| @@ -426,31 +576,30 @@ impl Blocktree { | ||||
|                 .unwrap_or(false) | ||||
|         }; | ||||
|  | ||||
|         if should_insert( | ||||
|             slot_meta, | ||||
|             &prev_inserted_data_indexes, | ||||
|             index as u64, | ||||
|             slot, | ||||
|             last_in_slot, | ||||
|             check_data_cf, | ||||
|         ) { | ||||
|             let new_consumed = compute_consume_index( | ||||
|                 prev_inserted_data_indexes, | ||||
|                 slot_meta, | ||||
|                 index, | ||||
|                 slot, | ||||
|                 check_data_cf, | ||||
|             ); | ||||
|         let index_meta = index_working_set | ||||
|             .get_mut(&slot) | ||||
|             .expect("Index must be present for all data blobs") | ||||
|             .data_mut(); | ||||
|  | ||||
|         if !index_meta.is_present(index) | ||||
|             && should_insert(slot_meta, index, slot, last_in_slot, check_data_cf) | ||||
|         { | ||||
|             let new_consumed = if slot_meta.consumed == index { | ||||
|                 let mut current_index = index + 1; | ||||
|  | ||||
|                 while index_meta.is_present(current_index) || check_data_cf(slot, current_index) { | ||||
|                     current_index += 1; | ||||
|                 } | ||||
|                 current_index | ||||
|             } else { | ||||
|                 slot_meta.consumed | ||||
|             }; | ||||
|  | ||||
|             let serialized_shred = bincode::serialize(shred).unwrap(); | ||||
|             write_batch.put_bytes::<cf::ShredData>((slot, index), &serialized_shred)?; | ||||
|  | ||||
|             update_slot_meta(last_in_slot, slot_meta, index, new_consumed); | ||||
|             index_working_set | ||||
|                 .get_mut(&slot) | ||||
|                 .expect("Index must be present for all data blobs") | ||||
|                 .data_mut() | ||||
|                 .set_present(index, true); | ||||
|             index_meta.set_present(index, true); | ||||
|             trace!("inserted shred into slot {:?} and index {:?}", slot, index); | ||||
|             Ok(true) | ||||
|         } else { | ||||
| @@ -605,12 +754,16 @@ impl Blocktree { | ||||
|                 remaining_ticks_in_slot -= 1; | ||||
|             } | ||||
|  | ||||
|             entries_to_shreds( | ||||
|                 vec![vec![entry.borrow().clone()]], | ||||
|                 ticks_per_slot - remaining_ticks_in_slot, | ||||
|                 ticks_per_slot, | ||||
|                 &mut shredder, | ||||
|             ); | ||||
|             let data = bincode::serialize(&vec![entry.borrow().clone()]).unwrap(); | ||||
|             let mut offset = 0; | ||||
|             while offset < data.len() { | ||||
|                 offset += shredder.write(&data[offset..]).unwrap(); | ||||
|             } | ||||
|             if remaining_ticks_in_slot == 0 { | ||||
|                 shredder.finalize_slot(); | ||||
|             } else { | ||||
|                 shredder.finalize_fec_block(); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         if is_full_slot && remaining_ticks_in_slot != 0 { | ||||
| @@ -624,7 +777,7 @@ impl Blocktree { | ||||
|         all_shreds.extend(shreds); | ||||
|  | ||||
|         let num_shreds = all_shreds.len(); | ||||
|         self.insert_shreds(&all_shreds)?; | ||||
|         self.insert_shreds(all_shreds)?; | ||||
|         Ok(num_shreds) | ||||
|     } | ||||
|  | ||||
| @@ -1182,8 +1335,8 @@ impl Blocktree { | ||||
|                 break; | ||||
|             } | ||||
|  | ||||
|             if let Ok(deshred) = Shredder::deshred(&shred_chunk) { | ||||
|                 let entries: Vec<Entry> = bincode::deserialize(&deshred.payload)?; | ||||
|             if let Ok(deshred_payload) = Shredder::deshred(&shred_chunk) { | ||||
|                 let entries: Vec<Entry> = bincode::deserialize(&deshred_payload)?; | ||||
|                 trace!("Found entries: {:#?}", entries); | ||||
|                 all_entries.extend(entries); | ||||
|                 num += shred_chunk.len(); | ||||
| @@ -1542,19 +1695,12 @@ fn should_insert_blob( | ||||
|             .unwrap_or(false) | ||||
|     }; | ||||
|  | ||||
|     should_insert( | ||||
|         slot, | ||||
|         prev_inserted_blob_datas, | ||||
|         blob_index, | ||||
|         blob_slot, | ||||
|         last_in_slot, | ||||
|         check_data_cf, | ||||
|     ) | ||||
|     !prev_inserted_blob_datas.contains_key(&(blob_slot, blob_index)) | ||||
|         && should_insert(slot, blob_index, blob_slot, last_in_slot, check_data_cf) | ||||
| } | ||||
|  | ||||
| fn should_insert<F>( | ||||
|     slot_meta: &SlotMeta, | ||||
|     prev_inserted_blob_datas: &HashMap<(u64, u64), &[u8]>, | ||||
|     index: u64, | ||||
|     slot: u64, | ||||
|     last_in_slot: bool, | ||||
| @@ -1564,10 +1710,7 @@ where | ||||
|     F: Fn(u64, u64) -> bool, | ||||
| { | ||||
|     // Check that the index doesn't already exist | ||||
|     if index < slot_meta.consumed | ||||
|         || prev_inserted_blob_datas.contains_key(&(slot, index)) | ||||
|         || db_check(slot, index) | ||||
|     { | ||||
|     if index < slot_meta.consumed || db_check(slot, index) { | ||||
|         return false; | ||||
|     } | ||||
|     // Check that we do not receive index >= than the last_index | ||||
| @@ -2276,14 +2419,19 @@ pub fn create_new_ledger(ledger_path: &Path, genesis_block: &GenesisBlock) -> Re | ||||
|     let mut shredder = Shredder::new(0, Some(0), 0.0, &Arc::new(Keypair::new()), 0) | ||||
|         .expect("Failed to create entry shredder"); | ||||
|     let last_hash = entries.last().unwrap().hash; | ||||
|     entries_to_shreds(vec![entries], ticks_per_slot, ticks_per_slot, &mut shredder); | ||||
|     let data = bincode::serialize(&entries).unwrap(); | ||||
|     let mut offset = 0; | ||||
|     while offset < data.len() { | ||||
|         offset += shredder.write(&data[offset..]).unwrap(); | ||||
|     } | ||||
|     shredder.finalize_slot(); | ||||
|     let shreds: Vec<Shred> = shredder | ||||
|         .shreds | ||||
|         .iter() | ||||
|         .map(|s| bincode::deserialize(s).unwrap()) | ||||
|         .collect(); | ||||
|  | ||||
|     blocktree.insert_shreds(&shreds)?; | ||||
|     blocktree.insert_shreds(shreds)?; | ||||
|  | ||||
|     Ok(last_hash) | ||||
| } | ||||
| @@ -2602,7 +2750,7 @@ pub mod tests { | ||||
|         // Insert last blob, we're missing the other blobs, so no consecutive | ||||
|         // blobs starting from slot 0, index 0 should exist. | ||||
|         let last_shred = shreds.pop().unwrap(); | ||||
|         ledger.insert_shreds(&[last_shred]).unwrap(); | ||||
|         ledger.insert_shreds(vec![last_shred]).unwrap(); | ||||
|         assert!(ledger.get_slot_entries(0, 0, None).unwrap().is_empty()); | ||||
|  | ||||
|         let meta = ledger | ||||
| @@ -2612,7 +2760,7 @@ pub mod tests { | ||||
|         assert!(meta.consumed == 0 && meta.received == num_shreds); | ||||
|  | ||||
|         // Insert the other blobs, check for consecutive returned entries | ||||
|         ledger.insert_shreds(&shreds).unwrap(); | ||||
|         ledger.insert_shreds(shreds).unwrap(); | ||||
|         let result = ledger.get_slot_entries(0, 0, None).unwrap(); | ||||
|  | ||||
|         assert_eq!(result, entries); | ||||
| @@ -2645,7 +2793,7 @@ pub mod tests { | ||||
|         // Insert blobs in reverse, check for consecutive returned blobs | ||||
|         for i in (0..num_shreds).rev() { | ||||
|             let shred = shreds.pop().unwrap(); | ||||
|             ledger.insert_shreds(&[shred]).unwrap(); | ||||
|             ledger.insert_shreds(vec![shred]).unwrap(); | ||||
|             let result = ledger.get_slot_entries(0, 0, None).unwrap(); | ||||
|  | ||||
|             let meta = ledger | ||||
| @@ -2721,7 +2869,7 @@ pub mod tests { | ||||
|             let entries = make_tiny_test_entries(8); | ||||
|             let shreds = entries_to_test_shreds(entries[0..4].to_vec(), 1, 0, false); | ||||
|             blocktree | ||||
|                 .insert_shreds(&shreds) | ||||
|                 .insert_shreds(shreds) | ||||
|                 .expect("Expected successful write of blobs"); | ||||
|  | ||||
|             let mut shreds1 = entries_to_test_shreds(entries[4..].to_vec(), 1, 0, false); | ||||
| @@ -2729,7 +2877,7 @@ pub mod tests { | ||||
|                 b.set_index(8 + i as u32); | ||||
|             } | ||||
|             blocktree | ||||
|                 .insert_shreds(&shreds1) | ||||
|                 .insert_shreds(shreds1) | ||||
|                 .expect("Expected successful write of blobs"); | ||||
|  | ||||
|             assert_eq!( | ||||
| @@ -2763,7 +2911,7 @@ pub mod tests { | ||||
|                     index += 1; | ||||
|                 } | ||||
|                 blocktree | ||||
|                     .insert_shreds(&shreds) | ||||
|                     .insert_shreds(shreds) | ||||
|                     .expect("Expected successful write of shreds"); | ||||
|                 assert_eq!( | ||||
|                     blocktree | ||||
| @@ -2796,7 +2944,7 @@ pub mod tests { | ||||
|                     entries_to_test_shreds(entries.clone(), slot, slot.saturating_sub(1), false); | ||||
|                 assert!(shreds.len() as u64 >= shreds_per_slot); | ||||
|                 blocktree | ||||
|                     .insert_shreds(&shreds) | ||||
|                     .insert_shreds(shreds) | ||||
|                     .expect("Expected successful write of shreds"); | ||||
|                 assert_eq!(blocktree.get_slot_entries(slot, 0, None).unwrap(), entries); | ||||
|             } | ||||
| @@ -2824,7 +2972,7 @@ pub mod tests { | ||||
|                         odd_shreds.insert(0, shreds.remove(i as usize)); | ||||
|                     } | ||||
|                 } | ||||
|                 blocktree.insert_shreds(&odd_shreds).unwrap(); | ||||
|                 blocktree.insert_shreds(odd_shreds).unwrap(); | ||||
|  | ||||
|                 assert_eq!(blocktree.get_slot_entries(slot, 0, None).unwrap(), vec![]); | ||||
|  | ||||
| @@ -2842,7 +2990,7 @@ pub mod tests { | ||||
|                     assert_eq!(meta.last_index, std::u64::MAX); | ||||
|                 } | ||||
|  | ||||
|                 blocktree.insert_shreds(&shreds).unwrap(); | ||||
|                 blocktree.insert_shreds(shreds).unwrap(); | ||||
|  | ||||
|                 assert_eq!( | ||||
|                     blocktree.get_slot_entries(slot, 0, None).unwrap(), | ||||
| @@ -2875,19 +3023,19 @@ pub mod tests { | ||||
|             // Discard first shred | ||||
|             original_shreds.remove(0); | ||||
|  | ||||
|             blocktree.insert_shreds(&original_shreds).unwrap(); | ||||
|             blocktree.insert_shreds(original_shreds).unwrap(); | ||||
|  | ||||
|             assert_eq!(blocktree.get_slot_entries(0, 0, None).unwrap(), vec![]); | ||||
|  | ||||
|             let duplicate_shreds = entries_to_test_shreds(original_entries.clone(), 0, 0, true); | ||||
|             blocktree.insert_shreds(&duplicate_shreds).unwrap(); | ||||
|             let num_shreds = duplicate_shreds.len() as u64; | ||||
|             blocktree.insert_shreds(duplicate_shreds).unwrap(); | ||||
|  | ||||
|             assert_eq!( | ||||
|                 blocktree.get_slot_entries(0, 0, None).unwrap(), | ||||
|                 original_entries | ||||
|             ); | ||||
|  | ||||
|             let num_shreds = duplicate_shreds.len() as u64; | ||||
|             let meta = blocktree.meta(0).unwrap().unwrap(); | ||||
|             assert_eq!(meta.consumed, num_shreds); | ||||
|             assert_eq!(meta.received, num_shreds); | ||||
| @@ -3523,11 +3671,11 @@ pub mod tests { | ||||
|             let num_shreds = shreds.len(); | ||||
|             // Write blobs to the database | ||||
|             if should_bulk_write { | ||||
|                 blocktree.insert_shreds(&shreds).unwrap(); | ||||
|                 blocktree.insert_shreds(shreds).unwrap(); | ||||
|             } else { | ||||
|                 for _ in 0..num_shreds { | ||||
|                     let shred = shreds.remove(0); | ||||
|                     blocktree.insert_shreds(&vec![shred]).unwrap(); | ||||
|                     blocktree.insert_shreds(vec![shred]).unwrap(); | ||||
|                 } | ||||
|             } | ||||
|  | ||||
| @@ -3569,7 +3717,7 @@ pub mod tests { | ||||
|             b.set_index(i as u32 * gap as u32); | ||||
|             b.set_slot(slot); | ||||
|         } | ||||
|         blocktree.insert_shreds(&shreds).unwrap(); | ||||
|         blocktree.insert_shreds(shreds).unwrap(); | ||||
|  | ||||
|         // Index of the first blob is 0 | ||||
|         // Index of the second blob is "gap" | ||||
| @@ -3654,6 +3802,7 @@ pub mod tests { | ||||
|  | ||||
|         let entries = make_tiny_test_entries(20); | ||||
|         let mut shreds = entries_to_test_shreds(entries, slot, 0, true); | ||||
|         shreds.drain(2..); | ||||
|  | ||||
|         const ONE: u64 = 1; | ||||
|         const OTHER: u64 = 4; | ||||
| @@ -3662,7 +3811,7 @@ pub mod tests { | ||||
|         shreds[1].set_index(OTHER as u32); | ||||
|  | ||||
|         // Insert one blob at index = first_index | ||||
|         blocktree.insert_shreds(&shreds[0..2]).unwrap(); | ||||
|         blocktree.insert_shreds(shreds).unwrap(); | ||||
|  | ||||
|         const STARTS: u64 = OTHER * 2; | ||||
|         const END: u64 = OTHER * 3; | ||||
| @@ -3696,7 +3845,7 @@ pub mod tests { | ||||
|         let shreds = entries_to_test_shreds(entries, slot, 0, true); | ||||
|         let num_shreds = shreds.len(); | ||||
|  | ||||
|         blocktree.insert_shreds(&shreds).unwrap(); | ||||
|         blocktree.insert_shreds(shreds).unwrap(); | ||||
|  | ||||
|         let empty: Vec<u64> = vec![]; | ||||
|         for i in 0..num_shreds as u64 { | ||||
| @@ -4051,6 +4200,7 @@ pub mod tests { | ||||
|         } | ||||
|  | ||||
|         #[test] | ||||
|         #[ignore] | ||||
|         pub fn test_recovery_basic() { | ||||
|             solana_logger::setup(); | ||||
|  | ||||
| @@ -4357,6 +4507,7 @@ pub mod tests { | ||||
|         } | ||||
|  | ||||
|         #[test] | ||||
|         #[ignore] | ||||
|         fn test_recovery_multi_slot_multi_thread() { | ||||
|             use rand::{rngs::SmallRng, seq::SliceRandom, SeedableRng}; | ||||
|             use std::thread; | ||||
| @@ -4615,14 +4766,16 @@ pub mod tests { | ||||
|         ) | ||||
|         .expect("Failed to create entry shredder"); | ||||
|  | ||||
|         let last_tick = 0; | ||||
|         let bank_max_tick = if is_full_slot { | ||||
|             last_tick | ||||
|         let data = bincode::serialize(&entries).unwrap(); | ||||
|         let mut offset = 0; | ||||
|         while offset < data.len() { | ||||
|             offset += shredder.write(&data[offset..]).unwrap(); | ||||
|         } | ||||
|         if is_full_slot { | ||||
|             shredder.finalize_slot(); | ||||
|         } else { | ||||
|             last_tick + 1 | ||||
|         }; | ||||
|  | ||||
|         entries_to_shreds(vec![entries], last_tick, bank_max_tick, &mut shredder); | ||||
|             shredder.finalize_fec_block(); | ||||
|         } | ||||
|  | ||||
|         let shreds: Vec<Shred> = shredder | ||||
|             .shreds | ||||
|   | ||||
| @@ -277,7 +277,7 @@ impl ErasureMeta { | ||||
|     } | ||||
|  | ||||
|     pub fn start_index(&self) -> u64 { | ||||
|         self.set_index * self.config.num_data() as u64 | ||||
|         self.set_index | ||||
|     } | ||||
|  | ||||
|     /// returns a tuple of (data_end, coding_end) | ||||
|   | ||||
| @@ -108,7 +108,6 @@ trait BroadcastRun { | ||||
|  | ||||
| struct Broadcast { | ||||
|     coding_generator: CodingGenerator, | ||||
|     parent_slot: Option<u64>, | ||||
|     thread_pool: ThreadPool, | ||||
| } | ||||
|  | ||||
| @@ -148,7 +147,6 @@ impl BroadcastStage { | ||||
|  | ||||
|         let mut broadcast = Broadcast { | ||||
|             coding_generator, | ||||
|             parent_slot: None, | ||||
|             thread_pool: rayon::ThreadPoolBuilder::new() | ||||
|                 .num_threads(sys_info::cpu_num().unwrap_or(NUM_THREADS) as usize) | ||||
|                 .build() | ||||
| @@ -298,7 +296,6 @@ mod test { | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     #[ignore] | ||||
|     fn test_broadcast_ledger() { | ||||
|         solana_logger::setup(); | ||||
|         let ledger_path = get_tmp_ledger_path("test_broadcast_ledger"); | ||||
| @@ -316,12 +313,13 @@ mod test { | ||||
|             let start_tick_height; | ||||
|             let max_tick_height; | ||||
|             let ticks_per_slot; | ||||
|             let slot; | ||||
|             { | ||||
|                 let bank = broadcast_service.bank.clone(); | ||||
|                 start_tick_height = bank.tick_height(); | ||||
|                 max_tick_height = bank.max_tick_height(); | ||||
|                 ticks_per_slot = bank.ticks_per_slot(); | ||||
|  | ||||
|                 slot = bank.slot(); | ||||
|                 let ticks = create_ticks(max_tick_height - start_tick_height, Hash::default()); | ||||
|                 for (i, tick) in ticks.into_iter().enumerate() { | ||||
|                     entry_sender | ||||
| @@ -339,15 +337,10 @@ mod test { | ||||
|             ); | ||||
|  | ||||
|             let blocktree = broadcast_service.blocktree; | ||||
|             let mut blob_index = 0; | ||||
|             for i in 0..max_tick_height - start_tick_height { | ||||
|                 let slot = (start_tick_height + i + 1) / ticks_per_slot; | ||||
|  | ||||
|                 let result = blocktree.get_data_shred_as_blob(slot, blob_index).unwrap(); | ||||
|  | ||||
|                 blob_index += 1; | ||||
|                 result.expect("expect blob presence"); | ||||
|             } | ||||
|             let (entries, _) = blocktree | ||||
|                 .get_slot_entries_with_shred_count(slot, 0) | ||||
|                 .expect("Expect entries to be present"); | ||||
|             assert_eq!(entries.len(), max_tick_height as usize); | ||||
|  | ||||
|             drop(entry_sender); | ||||
|             broadcast_service | ||||
|   | ||||
| @@ -4,12 +4,10 @@ use crate::erasure::CodingGenerator; | ||||
| use crate::packet::{self, SharedBlob}; | ||||
| use crate::poh_recorder::WorkingBankEntries; | ||||
| use crate::result::Result; | ||||
| use crate::shred::Shredder; | ||||
| use rayon::prelude::*; | ||||
| use rayon::ThreadPool; | ||||
| use solana_runtime::bank::Bank; | ||||
| use solana_sdk::signature::{Keypair, KeypairUtil, Signable}; | ||||
| use std::io::Write; | ||||
| use std::sync::mpsc::Receiver; | ||||
| use std::sync::Arc; | ||||
| use std::time::{Duration, Instant}; | ||||
| @@ -99,34 +97,6 @@ pub(super) fn entries_to_blobs( | ||||
|     (blobs, coding) | ||||
| } | ||||
|  | ||||
| pub fn entries_to_shreds( | ||||
|     ventries: Vec<Vec<Entry>>, | ||||
|     last_tick: u64, | ||||
|     bank_max_tick: u64, | ||||
|     shredder: &mut Shredder, | ||||
| ) { | ||||
|     ventries.iter().enumerate().for_each(|(i, entries)| { | ||||
|         let data = bincode::serialize(entries).unwrap(); | ||||
|         let mut offset = 0; | ||||
|         while offset < data.len() { | ||||
|             offset += shredder.write(&data[offset..]).unwrap(); | ||||
|         } | ||||
|         //        bincode::serialize_into(&shredder, &entries).unwrap(); | ||||
|         trace!( | ||||
|             "Shredded {:?} entries into {:?} shreds", | ||||
|             entries.len(), | ||||
|             shredder.shreds.len() | ||||
|         ); | ||||
|         if i + 1 == ventries.len() && last_tick == bank_max_tick { | ||||
|             debug!("Finalized slot for the shreds"); | ||||
|             shredder.finalize_slot(); | ||||
|         } else { | ||||
|             debug!("Finalized fec block for the shreds"); | ||||
|             shredder.finalize_fec_block(); | ||||
|         } | ||||
|     }) | ||||
| } | ||||
|  | ||||
| pub(super) fn generate_data_blobs( | ||||
|     ventries: Vec<Vec<(Entry, u64)>>, | ||||
|     thread_pool: &ThreadPool, | ||||
|   | ||||
| @@ -2,6 +2,7 @@ use super::broadcast_utils; | ||||
| use super::*; | ||||
| use crate::shred::Shred; | ||||
| use solana_sdk::timing::duration_as_ms; | ||||
| use std::io::Write; | ||||
|  | ||||
| #[derive(Default)] | ||||
| struct BroadcastStats { | ||||
| @@ -51,7 +52,7 @@ impl StandardBroadcastRun { | ||||
| impl BroadcastRun for StandardBroadcastRun { | ||||
|     fn run( | ||||
|         &mut self, | ||||
|         broadcast: &mut Broadcast, | ||||
|         _broadcast: &mut Broadcast, | ||||
|         cluster_info: &Arc<RwLock<ClusterInfo>>, | ||||
|         receiver: &Receiver<WorkingBankEntries>, | ||||
|         sock: &UdpSocket, | ||||
| @@ -68,73 +69,63 @@ impl BroadcastRun for StandardBroadcastRun { | ||||
|         // 2) Convert entries to blobs + generate coding blobs | ||||
|         let to_blobs_start = Instant::now(); | ||||
|         let keypair = &cluster_info.read().unwrap().keypair.clone(); | ||||
|         let latest_blob_index = blocktree | ||||
|         let mut latest_blob_index = blocktree | ||||
|             .meta(bank.slot()) | ||||
|             .expect("Database error") | ||||
|             .map(|meta| meta.consumed) | ||||
|             .unwrap_or(0); | ||||
|  | ||||
|         let parent_slot = bank.parent().unwrap().slot(); | ||||
|         let shredder = if let Some(slot) = broadcast.parent_slot { | ||||
|             if slot != parent_slot { | ||||
|                 trace!("Renew shredder with parent slot {:?}", parent_slot); | ||||
|                 broadcast.parent_slot = Some(parent_slot); | ||||
|                 Shredder::new( | ||||
|                     bank.slot(), | ||||
|                     Some(parent_slot), | ||||
|                     0.0, | ||||
|                     keypair, | ||||
|                     latest_blob_index as u32, | ||||
|                 ) | ||||
|             } else { | ||||
|                 trace!("Renew shredder with same parent slot {:?}", parent_slot); | ||||
|                 Shredder::new( | ||||
|                     bank.slot(), | ||||
|                     Some(parent_slot), | ||||
|                     0.0, | ||||
|                     keypair, | ||||
|                     latest_blob_index as u32, | ||||
|                 ) | ||||
|             } | ||||
|         let parent_slot = if let Some(parent_bank) = bank.parent() { | ||||
|             parent_bank.slot() | ||||
|         } else { | ||||
|             trace!("New shredder with parent slot {:?}", parent_slot); | ||||
|             broadcast.parent_slot = Some(parent_slot); | ||||
|             Shredder::new( | ||||
|                 bank.slot(), | ||||
|                 Some(parent_slot), | ||||
|                 0.0, | ||||
|                 keypair, | ||||
|                 latest_blob_index as u32, | ||||
|             ) | ||||
|             0 | ||||
|         }; | ||||
|         let mut shredder = shredder.expect("Expected to create a new shredder"); | ||||
|  | ||||
|         let ventries = receive_results | ||||
|         let mut all_shreds = vec![]; | ||||
|         let mut all_seeds = vec![]; | ||||
|         let num_ventries = receive_results.ventries.len(); | ||||
|         receive_results | ||||
|             .ventries | ||||
|             .into_iter() | ||||
|             .map(|entries_tuple| { | ||||
|             .enumerate() | ||||
|             .for_each(|(i, entries_tuple)| { | ||||
|                 let (entries, _): (Vec<_>, Vec<_>) = entries_tuple.into_iter().unzip(); | ||||
|                 entries | ||||
|             }) | ||||
|             .collect(); | ||||
|         broadcast_utils::entries_to_shreds( | ||||
|             ventries, | ||||
|             last_tick, | ||||
|             bank.max_tick_height(), | ||||
|             &mut shredder, | ||||
|         ); | ||||
|                 //entries | ||||
|                 let mut shredder = Shredder::new( | ||||
|                     bank.slot(), | ||||
|                     Some(parent_slot), | ||||
|                     1.0, | ||||
|                     keypair, | ||||
|                     latest_blob_index as u32, | ||||
|                 ) | ||||
|                 .expect("Expected to create a new shredder"); | ||||
|  | ||||
|         let shreds: Vec<Shred> = shredder | ||||
|             .shreds | ||||
|             .iter() | ||||
|             .map(|s| bincode::deserialize(s).unwrap()) | ||||
|             .collect(); | ||||
|                 let data = bincode::serialize(&entries).unwrap(); | ||||
|                 let mut offset = 0; | ||||
|                 while offset < data.len() { | ||||
|                     offset += shredder.write(&data[offset..]).unwrap(); | ||||
|                 } | ||||
|  | ||||
|         let seeds: Vec<[u8; 32]> = shreds.iter().map(|s| s.seed()).collect(); | ||||
|         trace!("Inserting {:?} shreds in blocktree", shreds.len()); | ||||
|         blocktree | ||||
|             .insert_shreds(&shreds) | ||||
|             .expect("Failed to insert shreds in blocktree"); | ||||
|                 if i == (num_ventries - 1) && last_tick == bank.max_tick_height() { | ||||
|                     shredder.finalize_slot(); | ||||
|                 } else { | ||||
|                     shredder.finalize_fec_block(); | ||||
|                 } | ||||
|  | ||||
|                 let shreds: Vec<Shred> = shredder | ||||
|                     .shreds | ||||
|                     .iter() | ||||
|                     .map(|s| bincode::deserialize(s).unwrap()) | ||||
|                     .collect(); | ||||
|  | ||||
|                 let mut seeds: Vec<[u8; 32]> = shreds.iter().map(|s| s.seed()).collect(); | ||||
|                 trace!("Inserting {:?} shreds in blocktree", shreds.len()); | ||||
|                 blocktree | ||||
|                     .insert_shreds(shreds) | ||||
|                     .expect("Failed to insert shreds in blocktree"); | ||||
|                 latest_blob_index = u64::from(shredder.index); | ||||
|                 all_shreds.append(&mut shredder.shreds); | ||||
|                 all_seeds.append(&mut seeds); | ||||
|             }); | ||||
|  | ||||
|         let to_blobs_elapsed = to_blobs_start.elapsed(); | ||||
|  | ||||
| @@ -143,15 +134,15 @@ impl BroadcastRun for StandardBroadcastRun { | ||||
|         let bank_epoch = bank.get_stakers_epoch(bank.slot()); | ||||
|         let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch); | ||||
|  | ||||
|         trace!("Broadcasting {:?} shreds", shredder.shreds.len()); | ||||
|         trace!("Broadcasting {:?} shreds", all_shreds.len()); | ||||
|         cluster_info.read().unwrap().broadcast_shreds( | ||||
|             sock, | ||||
|             &shredder.shreds, | ||||
|             &seeds, | ||||
|             &all_shreds, | ||||
|             &all_seeds, | ||||
|             stakes.as_ref(), | ||||
|         )?; | ||||
|  | ||||
|         inc_new_counter_debug!("streamer-broadcast-sent", shredder.shreds.len()); | ||||
|         inc_new_counter_debug!("streamer-broadcast-sent", all_shreds.len()); | ||||
|  | ||||
|         let broadcast_elapsed = broadcast_start.elapsed(); | ||||
|         self.update_broadcast_stats( | ||||
|   | ||||
| @@ -1916,7 +1916,7 @@ mod tests { | ||||
|             shred.set_index(1); | ||||
|  | ||||
|             blocktree | ||||
|                 .insert_shreds(&vec![shred]) | ||||
|                 .insert_shreds(vec![shred]) | ||||
|                 .expect("Expect successful ledger write"); | ||||
|  | ||||
|             let rv = ClusterInfo::run_window_request( | ||||
| @@ -1994,7 +1994,7 @@ mod tests { | ||||
|             let (blobs, _) = make_many_slot_entries_using_shreds(1, 3, 5); | ||||
|  | ||||
|             blocktree | ||||
|                 .insert_shreds(&blobs) | ||||
|                 .insert_shreds(blobs) | ||||
|                 .expect("Expect successful ledger write"); | ||||
|  | ||||
|             // We don't have slot 4, so we don't know how to service this requeset | ||||
|   | ||||
| @@ -627,7 +627,7 @@ mod tests { | ||||
|         let num_shreds_per_slot = shreds.len() as u64 / num_slots; | ||||
|  | ||||
|         // Write slots in the range [0, num_slots] to blocktree | ||||
|         blocktree.insert_shreds(&shreds).unwrap(); | ||||
|         blocktree.insert_shreds(shreds).unwrap(); | ||||
|  | ||||
|         // Write roots so that these slots will qualify to be sent by the repairman | ||||
|         let roots: Vec<_> = (0..=num_slots - 1).collect(); | ||||
| @@ -704,7 +704,7 @@ mod tests { | ||||
|         // Create blobs for first two epochs and write them to blocktree | ||||
|         let total_slots = slots_per_epoch * 2; | ||||
|         let (shreds, _) = make_many_slot_entries_using_shreds(0, total_slots, 1); | ||||
|         blocktree.insert_shreds(&shreds).unwrap(); | ||||
|         blocktree.insert_shreds(shreds).unwrap(); | ||||
|  | ||||
|         // Write roots so that these slots will qualify to be sent by the repairman | ||||
|         let roots: Vec<_> = (0..=slots_per_epoch * 2 - 1).collect(); | ||||
|   | ||||
| @@ -488,7 +488,7 @@ mod test { | ||||
|                     missing_indexes_per_slot.insert(0, index); | ||||
|                 } | ||||
|             } | ||||
|             blocktree.insert_shreds(&shreds_to_write).unwrap(); | ||||
|             blocktree.insert_shreds(shreds_to_write).unwrap(); | ||||
|  | ||||
|             let expected: Vec<RepairType> = (0..num_slots) | ||||
|                 .flat_map(|slot| { | ||||
| @@ -548,8 +548,9 @@ mod test { | ||||
|             let num_entries_per_slot = 10; | ||||
|  | ||||
|             let shreds = make_chaining_slot_entries_using_shreds(&slots, num_entries_per_slot); | ||||
|             for (slot_shreds, _) in shreds.iter() { | ||||
|                 blocktree.insert_shreds(&slot_shreds[1..]).unwrap(); | ||||
|             for (mut slot_shreds, _) in shreds.into_iter() { | ||||
|                 slot_shreds.remove(0); | ||||
|                 blocktree.insert_shreds(slot_shreds).unwrap(); | ||||
|             } | ||||
|  | ||||
|             // Iterate through all possible combinations of start..end (inclusive on both | ||||
|   | ||||
| @@ -1018,7 +1018,7 @@ mod test { | ||||
|             let last_blockhash = bank0.last_blockhash(); | ||||
|             progress.insert(bank0.slot(), ForkProgress::new(last_blockhash)); | ||||
|             let shreds = shred_to_insert(&last_blockhash, bank0.slot()); | ||||
|             blocktree.insert_shreds(&shreds).unwrap(); | ||||
|             blocktree.insert_shreds(shreds).unwrap(); | ||||
|             let (res, _tx_count) = | ||||
|                 ReplayStage::replay_blocktree_into_bank(&bank0, &blocktree, &mut progress); | ||||
|  | ||||
|   | ||||
| @@ -873,7 +873,7 @@ impl Replicator { | ||||
|                     .iter() | ||||
|                     .filter_map(|p| bincode::deserialize(&p.data).ok()) | ||||
|                     .collect(); | ||||
|                 blocktree.insert_shreds(&shreds)?; | ||||
|                 blocktree.insert_shreds(shreds)?; | ||||
|             } | ||||
|             // check if all the slots in the segment are complete | ||||
|             if Self::segment_complete(start_slot, slots_per_segment, blocktree) { | ||||
|   | ||||
| @@ -294,7 +294,7 @@ impl ShredCommon for CodingShred { | ||||
| #[derive(Default, Debug)] | ||||
| pub struct Shredder { | ||||
|     slot: u64, | ||||
|     index: u32, | ||||
|     pub index: u32, | ||||
|     pub parent: Option<u64>, | ||||
|     parent_slot: u64, | ||||
|     fec_rate: f32, | ||||
| @@ -363,6 +363,12 @@ impl Write for Shredder { | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[derive(Default, Debug, PartialEq)] | ||||
| pub struct RecoveryResult { | ||||
|     pub recovered_data: Vec<Shred>, | ||||
|     pub recovered_code: Vec<Shred>, | ||||
| } | ||||
|  | ||||
| #[derive(Default, Debug, PartialEq)] | ||||
| pub struct DeshredResult { | ||||
|     pub payload: Vec<u8>, | ||||
| @@ -555,9 +561,15 @@ impl Shredder { | ||||
|     ) -> (Vec<Vec<u8>>, bool, usize) { | ||||
|         let (index, mut first_shred_in_slot) = Self::get_shred_index(shred, num_data); | ||||
|  | ||||
|         // The index of current shred must be within the range of shreds that are being | ||||
|         // recovered | ||||
|         if !(first_index..first_index + num_data + num_coding).contains(&index) { | ||||
|             return (vec![], false, index); | ||||
|         } | ||||
|  | ||||
|         let mut missing_blocks: Vec<Vec<u8>> = (expected_index..index) | ||||
|             .map(|missing| { | ||||
|                 present[missing] = false; | ||||
|                 present[missing.saturating_sub(first_index)] = false; | ||||
|                 // If index 0 shred is missing, then first shred in slot will also be recovered | ||||
|                 first_shred_in_slot |= missing == 0; | ||||
|                 Shredder::new_empty_missing_shred(num_data, num_coding, slot, first_index, missing) | ||||
| @@ -601,39 +613,26 @@ impl Shredder { | ||||
|         bincode::serialize(&missing_shred).unwrap() | ||||
|     } | ||||
|  | ||||
|     /// Combines all shreds to recreate the original buffer | ||||
|     /// If the shreds include coding shreds, and if not all shreds are present, it tries | ||||
|     /// to reconstruct missing shreds using erasure | ||||
|     /// Note: The shreds are expected to be sorted | ||||
|     /// (lower to higher index, and data shreds before coding shreds) | ||||
|     pub fn deshred(shreds: &[Shred]) -> Result<DeshredResult, reed_solomon_erasure::Error> { | ||||
|         // If coding is enabled, the last shred must be a coding shred. | ||||
|         let (num_data, num_coding, first_index, slot) = | ||||
|             if let Shred::Coding(code) = shreds.last().unwrap() { | ||||
|                 ( | ||||
|                     code.header.num_data_shreds as usize, | ||||
|                     code.header.num_coding_shreds as usize, | ||||
|                     code.header.common_header.index as usize - code.header.position as usize, | ||||
|                     code.header.common_header.slot, | ||||
|                 ) | ||||
|             } else { | ||||
|                 (shreds.len(), 0, 0, 0) | ||||
|             }; | ||||
|  | ||||
|     pub fn try_recovery( | ||||
|         shreds: &[Shred], | ||||
|         num_data: usize, | ||||
|         num_coding: usize, | ||||
|         first_index: usize, | ||||
|         slot: u64, | ||||
|     ) -> Result<RecoveryResult, reed_solomon_erasure::Error> { | ||||
|         let mut recovered_data = vec![]; | ||||
|         let mut recovered_code = vec![]; | ||||
|         let fec_set_size = num_data + num_coding; | ||||
|         let (data_shred_bufs, first_shred) = if num_coding > 0 && shreds.len() < fec_set_size { | ||||
|         if num_coding > 0 && shreds.len() < fec_set_size { | ||||
|             let coding_block_offset = CodingShred::overhead(); | ||||
|  | ||||
|             // Let's try recovering missing shreds using erasure | ||||
|             let mut present = &mut vec![true; fec_set_size]; | ||||
|             let mut first_shred_in_slot = false; | ||||
|             let mut next_expected_index = first_index; | ||||
|             let mut shred_bufs: Vec<Vec<u8>> = shreds | ||||
|                 .iter() | ||||
|                 .flat_map(|shred| { | ||||
|                     let (blocks, first_shred, last_index) = Self::fill_in_missing_shreds( | ||||
|                     let (blocks, _first_shred, last_index) = Self::fill_in_missing_shreds( | ||||
|                         shred, | ||||
|                         num_data, | ||||
|                         num_coding, | ||||
| @@ -642,21 +641,26 @@ impl Shredder { | ||||
|                         next_expected_index, | ||||
|                         &mut present, | ||||
|                     ); | ||||
|                     first_shred_in_slot |= first_shred; | ||||
|                     next_expected_index = last_index + 1; | ||||
|                     blocks | ||||
|                 }) | ||||
|                 .collect(); | ||||
|  | ||||
|             // Insert any other missing shreds after the last shred we have received in the | ||||
|             // current FEC block | ||||
|             let mut pending_shreds: Vec<Vec<u8>> = (next_expected_index | ||||
|                 ..first_index + fec_set_size) | ||||
|                 .map(|missing| { | ||||
|                     present[missing] = false; | ||||
|                     present[missing.saturating_sub(first_index)] = false; | ||||
|                     Self::new_empty_missing_shred(num_data, num_coding, slot, first_index, missing) | ||||
|                 }) | ||||
|                 .collect(); | ||||
|             shred_bufs.append(&mut pending_shreds); | ||||
|  | ||||
|             if shred_bufs.len() != fec_set_size { | ||||
|                 Err(reed_solomon_erasure::Error::TooFewShardsPresent)?; | ||||
|             } | ||||
|  | ||||
|             let session = Session::new(num_data, num_coding).unwrap(); | ||||
|  | ||||
|             let mut blocks: Vec<&mut [u8]> = shred_bufs | ||||
| @@ -665,35 +669,64 @@ impl Shredder { | ||||
|                 .collect(); | ||||
|             session.decode_blocks(&mut blocks, &present)?; | ||||
|  | ||||
|             present.iter().enumerate().for_each(|(index, was_present)| { | ||||
|                 if !was_present { | ||||
|                     let shred: Shred = bincode::deserialize(&shred_bufs[index]).unwrap(); | ||||
|                     if index < first_index + num_data { | ||||
|                         // Check if the last recovered data shred is also last in Slot. | ||||
|                         // If so, it needs to be morphed into the correct type | ||||
|                         let shred = if let Shred::Data(s) = shred { | ||||
|                             if s.header.last_in_slot == 1 { | ||||
|                                 Shred::LastInSlot(s) | ||||
|                             } else { | ||||
|                                 Shred::Data(s) | ||||
|             present | ||||
|                 .iter() | ||||
|                 .enumerate() | ||||
|                 .for_each(|(position, was_present)| { | ||||
|                     if !was_present { | ||||
|                         let shred: Shred = bincode::deserialize(&shred_bufs[position]).unwrap(); | ||||
|                         let shred_index = shred.index() as usize; | ||||
|                         // Valid shred must be in the same slot as the original shreds | ||||
|                         if shred.slot() == slot { | ||||
|                             // Data shreds are "positioned" at the start of the iterator. First num_data | ||||
|                             // shreds are expected to be the data shreds. | ||||
|                             if position < num_data | ||||
|                                 && (first_index..first_index + num_data).contains(&shred_index) | ||||
|                             { | ||||
|                                 // Also, a valid data shred must be indexed between first_index and first+num_data index | ||||
|  | ||||
|                                 // Check if the last recovered data shred is also last in Slot. | ||||
|                                 // If so, it needs to be morphed into the correct type | ||||
|                                 let shred = if let Shred::Data(s) = shred { | ||||
|                                     if s.header.last_in_slot == 1 { | ||||
|                                         Shred::LastInSlot(s) | ||||
|                                     } else { | ||||
|                                         Shred::Data(s) | ||||
|                                     } | ||||
|                                 } else if let Shred::LastInFECSet(s) = shred { | ||||
|                                     if s.header.last_in_slot == 1 { | ||||
|                                         Shred::LastInSlot(s) | ||||
|                                     } else { | ||||
|                                         Shred::LastInFECSet(s) | ||||
|                                     } | ||||
|                                 } else { | ||||
|                                     shred | ||||
|                                 }; | ||||
|                                 recovered_data.push(shred) | ||||
|                             } else if (first_index..first_index + num_coding).contains(&shred_index) | ||||
|                             { | ||||
|                                 // A valid coding shred must be indexed between first_index and first+num_coding index | ||||
|                                 recovered_code.push(shred) | ||||
|                             } | ||||
|                         } else if let Shred::LastInFECSet(s) = shred { | ||||
|                             if s.header.last_in_slot == 1 { | ||||
|                                 Shred::LastInSlot(s) | ||||
|                             } else { | ||||
|                                 Shred::LastInFECSet(s) | ||||
|                             } | ||||
|                         } else { | ||||
|                             shred | ||||
|                         }; | ||||
|                         recovered_data.push(shred) | ||||
|                     } else { | ||||
|                         recovered_code.push(shred) | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|             }); | ||||
|             (shred_bufs, first_shred_in_slot) | ||||
|         } else { | ||||
|                 }); | ||||
|         } | ||||
|  | ||||
|         Ok(RecoveryResult { | ||||
|             recovered_data, | ||||
|             recovered_code, | ||||
|         }) | ||||
|     } | ||||
|  | ||||
|     /// Combines all shreds to recreate the original buffer | ||||
|     /// If the shreds include coding shreds, and if not all shreds are present, it tries | ||||
|     /// to reconstruct missing shreds using erasure | ||||
|     /// Note: The shreds are expected to be sorted | ||||
|     /// (lower to higher index, and data shreds before coding shreds) | ||||
|     pub fn deshred(shreds: &[Shred]) -> Result<Vec<u8>, reed_solomon_erasure::Error> { | ||||
|         let num_data = shreds.len(); | ||||
|         let (data_shred_bufs, first_shred) = { | ||||
|             let (first_index, first_shred_in_slot) = | ||||
|                 Shredder::get_shred_index(shreds.first().unwrap(), num_data); | ||||
|  | ||||
| @@ -715,11 +748,11 @@ impl Shredder { | ||||
|             (shred_bufs, first_shred_in_slot) | ||||
|         }; | ||||
|  | ||||
|         Ok(DeshredResult { | ||||
|             payload: Self::reassemble_payload(num_data, data_shred_bufs, first_shred), | ||||
|             recovered_data, | ||||
|             recovered_code, | ||||
|         }) | ||||
|         Ok(Self::reassemble_payload( | ||||
|             num_data, | ||||
|             data_shred_bufs, | ||||
|             first_shred, | ||||
|         )) | ||||
|     } | ||||
|  | ||||
|     fn get_shred_index(shred: &Shred, num_data: usize) -> (usize, bool) { | ||||
| @@ -1085,20 +1118,34 @@ mod tests { | ||||
|  | ||||
|         // Test0: Try recovery/reassembly with only data shreds, but not all data shreds. Hint: should fail | ||||
|         assert_matches!( | ||||
|             Shredder::deshred(&shreds[..4]), | ||||
|             Err(reed_solomon_erasure::Error::TooFewDataShards) | ||||
|             Shredder::try_recovery( | ||||
|                 &shreds[..4], | ||||
|                 expected_shred_count / 2, | ||||
|                 expected_shred_count / 2, | ||||
|                 0, | ||||
|                 slot | ||||
|             ), | ||||
|             Err(reed_solomon_erasure::Error::TooFewShardsPresent) | ||||
|         ); | ||||
|  | ||||
|         // Test1: Try recovery/reassembly with only data shreds. Hint: should work | ||||
|         let result = Shredder::deshred(&shreds[..5]).unwrap(); | ||||
|         assert_ne!(DeshredResult::default(), result); | ||||
|         assert!(result.payload.len() >= data.len()); | ||||
|         let result = Shredder::try_recovery( | ||||
|             &shreds[..5], | ||||
|             expected_shred_count / 2, | ||||
|             expected_shred_count / 2, | ||||
|             0, | ||||
|             slot, | ||||
|         ) | ||||
|         .unwrap(); | ||||
|         assert_ne!(RecoveryResult::default(), result); | ||||
|         assert!(result.recovered_data.is_empty()); | ||||
|         assert!(result.recovered_code.is_empty()); | ||||
|         assert_eq!(data[..], result.payload[..data.len()]); | ||||
|         assert!(!result.recovered_code.is_empty()); | ||||
|         let result = Shredder::deshred(&shreds[..5]).unwrap(); | ||||
|         assert!(result.len() >= data.len()); | ||||
|         assert_eq!(data[..], result[..data.len()]); | ||||
|  | ||||
|         // Test2: Try recovery/reassembly with missing data shreds + coding shreds. Hint: should work | ||||
|         let shreds: Vec<Shred> = shredder | ||||
|         let mut shreds: Vec<Shred> = shredder | ||||
|             .shreds | ||||
|             .iter() | ||||
|             .enumerate() | ||||
| @@ -1111,20 +1158,30 @@ mod tests { | ||||
|             }) | ||||
|             .collect(); | ||||
|  | ||||
|         let mut result = Shredder::deshred(&shreds).unwrap(); | ||||
|         assert!(result.payload.len() >= data.len()); | ||||
|         let mut result = Shredder::try_recovery( | ||||
|             &shreds, | ||||
|             expected_shred_count / 2, | ||||
|             expected_shred_count / 2, | ||||
|             0, | ||||
|             slot, | ||||
|         ) | ||||
|         .unwrap(); | ||||
|         assert_ne!(RecoveryResult::default(), result); | ||||
|  | ||||
|         assert_eq!(result.recovered_data.len(), 2); // Data shreds 1 and 3 were missing | ||||
|         let recovered_shred = result.recovered_data.remove(0); | ||||
|         assert_matches!(recovered_shred, Shred::Data(_)); | ||||
|         assert_eq!(recovered_shred.index(), 1); | ||||
|         assert_eq!(recovered_shred.slot(), slot); | ||||
|         assert!(recovered_shred.verify(&keypair.pubkey())); | ||||
|         shreds.insert(1, recovered_shred); | ||||
|  | ||||
|         let recovered_shred = result.recovered_data.remove(0); | ||||
|         assert_matches!(recovered_shred, Shred::Data(_)); | ||||
|         assert_eq!(recovered_shred.index(), 3); | ||||
|         assert_eq!(recovered_shred.slot(), slot); | ||||
|         assert!(recovered_shred.verify(&keypair.pubkey())); | ||||
|         shreds.insert(3, recovered_shred); | ||||
|  | ||||
|         assert_eq!(result.recovered_code.len(), 3); // Coding shreds 5, 7, 9 were missing | ||||
|         let recovered_shred = result.recovered_code.remove(0); | ||||
| @@ -1151,10 +1208,13 @@ mod tests { | ||||
|             assert_eq!(code.header.common_header.slot, slot); | ||||
|             assert_eq!(code.header.common_header.index, 4); | ||||
|         } | ||||
|         assert_eq!(data[..], result.payload[..data.len()]); | ||||
|  | ||||
|         let result = Shredder::deshred(&shreds[..5]).unwrap(); | ||||
|         assert!(result.len() >= data.len()); | ||||
|         assert_eq!(data[..], result[..data.len()]); | ||||
|  | ||||
|         // Test3: Try recovery/reassembly with 3 missing data shreds + 2 coding shreds. Hint: should work | ||||
|         let shreds: Vec<Shred> = shredder | ||||
|         let mut shreds: Vec<Shred> = shredder | ||||
|             .shreds | ||||
|             .iter() | ||||
|             .enumerate() | ||||
| @@ -1167,26 +1227,37 @@ mod tests { | ||||
|             }) | ||||
|             .collect(); | ||||
|  | ||||
|         let mut result = Shredder::deshred(&shreds).unwrap(); | ||||
|         assert!(result.payload.len() >= data.len()); | ||||
|         let mut result = Shredder::try_recovery( | ||||
|             &shreds, | ||||
|             expected_shred_count / 2, | ||||
|             expected_shred_count / 2, | ||||
|             0, | ||||
|             slot, | ||||
|         ) | ||||
|         .unwrap(); | ||||
|         assert_ne!(RecoveryResult::default(), result); | ||||
|  | ||||
|         assert_eq!(result.recovered_data.len(), 3); // Data shreds 0, 2 and 4 were missing | ||||
|         let recovered_shred = result.recovered_data.remove(0); | ||||
|         assert_matches!(recovered_shred, Shred::FirstInSlot(_)); | ||||
|         assert_eq!(recovered_shred.index(), 0); | ||||
|         assert_eq!(recovered_shred.slot(), slot); | ||||
|         assert!(recovered_shred.verify(&keypair.pubkey())); | ||||
|         shreds.insert(0, recovered_shred); | ||||
|  | ||||
|         let recovered_shred = result.recovered_data.remove(0); | ||||
|         assert_matches!(recovered_shred, Shred::Data(_)); | ||||
|         assert_eq!(recovered_shred.index(), 2); | ||||
|         assert_eq!(recovered_shred.slot(), slot); | ||||
|         assert!(recovered_shred.verify(&keypair.pubkey())); | ||||
|         shreds.insert(2, recovered_shred); | ||||
|  | ||||
|         let recovered_shred = result.recovered_data.remove(0); | ||||
|         assert_matches!(recovered_shred, Shred::LastInFECSet(_)); | ||||
|         assert_eq!(recovered_shred.index(), 4); | ||||
|         assert_eq!(recovered_shred.slot(), slot); | ||||
|         assert!(recovered_shred.verify(&keypair.pubkey())); | ||||
|         shreds.insert(4, recovered_shred); | ||||
|  | ||||
|         assert_eq!(result.recovered_code.len(), 2); // Coding shreds 6, 8 were missing | ||||
|         let recovered_shred = result.recovered_code.remove(0); | ||||
| @@ -1205,7 +1276,10 @@ mod tests { | ||||
|             assert_eq!(code.header.common_header.slot, slot); | ||||
|             assert_eq!(code.header.common_header.index, 3); | ||||
|         } | ||||
|         assert_eq!(data[..], result.payload[..data.len()]); | ||||
|  | ||||
|         let result = Shredder::deshred(&shreds[..5]).unwrap(); | ||||
|         assert!(result.len() >= data.len()); | ||||
|         assert_eq!(data[..], result[..data.len()]); | ||||
|  | ||||
|         // Test4: Try recovery/reassembly full slot with 3 missing data shreds + 2 coding shreds. Hint: should work | ||||
|         let mut shredder = | ||||
| @@ -1231,7 +1305,7 @@ mod tests { | ||||
|         let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2; | ||||
|         assert_eq!(shredder.shreds.len(), expected_shred_count); | ||||
|  | ||||
|         let shreds: Vec<Shred> = shredder | ||||
|         let mut shreds: Vec<Shred> = shredder | ||||
|             .shreds | ||||
|             .iter() | ||||
|             .enumerate() | ||||
| @@ -1244,26 +1318,37 @@ mod tests { | ||||
|             }) | ||||
|             .collect(); | ||||
|  | ||||
|         let mut result = Shredder::deshred(&shreds).unwrap(); | ||||
|         assert!(result.payload.len() >= data.len()); | ||||
|         let mut result = Shredder::try_recovery( | ||||
|             &shreds, | ||||
|             expected_shred_count / 2, | ||||
|             expected_shred_count / 2, | ||||
|             0, | ||||
|             slot, | ||||
|         ) | ||||
|         .unwrap(); | ||||
|         assert_ne!(RecoveryResult::default(), result); | ||||
|  | ||||
|         assert_eq!(result.recovered_data.len(), 3); // Data shreds 0, 2 and 4 were missing | ||||
|         let recovered_shred = result.recovered_data.remove(0); | ||||
|         assert_matches!(recovered_shred, Shred::FirstInSlot(_)); | ||||
|         assert_eq!(recovered_shred.index(), 0); | ||||
|         assert_eq!(recovered_shred.slot(), slot); | ||||
|         assert!(recovered_shred.verify(&keypair.pubkey())); | ||||
|         shreds.insert(0, recovered_shred); | ||||
|  | ||||
|         let recovered_shred = result.recovered_data.remove(0); | ||||
|         assert_matches!(recovered_shred, Shred::Data(_)); | ||||
|         assert_eq!(recovered_shred.index(), 2); | ||||
|         assert_eq!(recovered_shred.slot(), slot); | ||||
|         assert!(recovered_shred.verify(&keypair.pubkey())); | ||||
|         shreds.insert(2, recovered_shred); | ||||
|  | ||||
|         let recovered_shred = result.recovered_data.remove(0); | ||||
|         assert_matches!(recovered_shred, Shred::LastInSlot(_)); | ||||
|         assert_eq!(recovered_shred.index(), 4); | ||||
|         assert_eq!(recovered_shred.slot(), slot); | ||||
|         assert!(recovered_shred.verify(&keypair.pubkey())); | ||||
|         shreds.insert(4, recovered_shred); | ||||
|  | ||||
|         assert_eq!(result.recovered_code.len(), 2); // Coding shreds 6, 8 were missing | ||||
|         let recovered_shred = result.recovered_code.remove(0); | ||||
| @@ -1282,7 +1367,10 @@ mod tests { | ||||
|             assert_eq!(code.header.common_header.slot, slot); | ||||
|             assert_eq!(code.header.common_header.index, 3); | ||||
|         } | ||||
|         assert_eq!(data[..], result.payload[..data.len()]); | ||||
|  | ||||
|         let result = Shredder::deshred(&shreds[..5]).unwrap(); | ||||
|         assert!(result.len() >= data.len()); | ||||
|         assert_eq!(data[..], result[..data.len()]); | ||||
|  | ||||
|         // Test5: Try recovery/reassembly with 3 missing data shreds + 3 coding shreds. Hint: should fail | ||||
|         let shreds: Vec<Shred> = shredder | ||||
| @@ -1301,6 +1389,132 @@ mod tests { | ||||
|         assert_eq!(shreds.len(), 4); | ||||
|         assert_matches!( | ||||
|             Shredder::deshred(&shreds), | ||||
|             Err(reed_solomon_erasure::Error::TooFewDataShards) | ||||
|         ); | ||||
|  | ||||
|         // Test6: Try recovery/reassembly with non zero index full slot with 3 missing data shreds + 2 coding shreds. Hint: should work | ||||
|         let mut shredder = | ||||
|             Shredder::new(slot, Some(5), 1.0, &keypair, 25).expect("Failed in creating shredder"); | ||||
|  | ||||
|         let mut offset = shredder.write(&data).unwrap(); | ||||
|         let approx_shred_payload_size = offset; | ||||
|         offset += shredder.write(&data[offset..]).unwrap(); | ||||
|         offset += shredder.write(&data[offset..]).unwrap(); | ||||
|         offset += shredder.write(&data[offset..]).unwrap(); | ||||
|         offset += shredder.write(&data[offset..]).unwrap(); | ||||
|  | ||||
|         // We should have some shreds now | ||||
|         assert_eq!( | ||||
|             shredder.shreds.len(), | ||||
|             data.len() / approx_shred_payload_size | ||||
|         ); | ||||
|         assert_eq!(offset, data.len()); | ||||
|  | ||||
|         shredder.finalize_slot(); | ||||
|  | ||||
|         // We should have 10 shreds now (one additional final shred, and equal number of coding shreds) | ||||
|         let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2; | ||||
|         assert_eq!(shredder.shreds.len(), expected_shred_count); | ||||
|  | ||||
|         let mut shreds: Vec<Shred> = shredder | ||||
|             .shreds | ||||
|             .iter() | ||||
|             .enumerate() | ||||
|             .filter_map(|(i, s)| { | ||||
|                 if i % 2 != 0 { | ||||
|                     Some(bincode::deserialize(s).unwrap()) | ||||
|                 } else { | ||||
|                     None | ||||
|                 } | ||||
|             }) | ||||
|             .collect(); | ||||
|  | ||||
|         let mut result = Shredder::try_recovery( | ||||
|             &shreds, | ||||
|             expected_shred_count / 2, | ||||
|             expected_shred_count / 2, | ||||
|             25, | ||||
|             slot, | ||||
|         ) | ||||
|         .unwrap(); | ||||
|         assert_ne!(RecoveryResult::default(), result); | ||||
|  | ||||
|         assert_eq!(result.recovered_data.len(), 3); // Data shreds 0, 2 and 4 were missing | ||||
|         let recovered_shred = result.recovered_data.remove(0); | ||||
|         assert_matches!(recovered_shred, Shred::Data(_)); | ||||
|         assert_eq!(recovered_shred.index(), 25); | ||||
|         assert_eq!(recovered_shred.slot(), slot); | ||||
|         assert!(recovered_shred.verify(&keypair.pubkey())); | ||||
|         shreds.insert(0, recovered_shred); | ||||
|  | ||||
|         let recovered_shred = result.recovered_data.remove(0); | ||||
|         assert_matches!(recovered_shred, Shred::Data(_)); | ||||
|         assert_eq!(recovered_shred.index(), 27); | ||||
|         assert_eq!(recovered_shred.slot(), slot); | ||||
|         assert!(recovered_shred.verify(&keypair.pubkey())); | ||||
|         shreds.insert(2, recovered_shred); | ||||
|  | ||||
|         let recovered_shred = result.recovered_data.remove(0); | ||||
|         assert_matches!(recovered_shred, Shred::LastInSlot(_)); | ||||
|         assert_eq!(recovered_shred.index(), 29); | ||||
|         assert_eq!(recovered_shred.slot(), slot); | ||||
|         assert!(recovered_shred.verify(&keypair.pubkey())); | ||||
|         shreds.insert(4, recovered_shred); | ||||
|  | ||||
|         assert_eq!(result.recovered_code.len(), 2); // Coding shreds 6, 8 were missing | ||||
|         let recovered_shred = result.recovered_code.remove(0); | ||||
|         if let Shred::Coding(code) = recovered_shred { | ||||
|             assert_eq!(code.header.num_data_shreds, 5); | ||||
|             assert_eq!(code.header.num_coding_shreds, 5); | ||||
|             assert_eq!(code.header.position, 1); | ||||
|             assert_eq!(code.header.common_header.slot, slot); | ||||
|             assert_eq!(code.header.common_header.index, 26); | ||||
|         } | ||||
|         let recovered_shred = result.recovered_code.remove(0); | ||||
|         if let Shred::Coding(code) = recovered_shred { | ||||
|             assert_eq!(code.header.num_data_shreds, 5); | ||||
|             assert_eq!(code.header.num_coding_shreds, 5); | ||||
|             assert_eq!(code.header.position, 3); | ||||
|             assert_eq!(code.header.common_header.slot, slot); | ||||
|             assert_eq!(code.header.common_header.index, 28); | ||||
|         } | ||||
|  | ||||
|         let result = Shredder::deshred(&shreds[..5]).unwrap(); | ||||
|         assert!(result.len() >= data.len()); | ||||
|         assert_eq!(data[..], result[..data.len()]); | ||||
|  | ||||
|         // Test7: Try recovery/reassembly with incorrect slot. Hint: does not recover any shreds | ||||
|         let result = Shredder::try_recovery( | ||||
|             &shreds, | ||||
|             expected_shred_count / 2, | ||||
|             expected_shred_count / 2, | ||||
|             25, | ||||
|             slot + 1, | ||||
|         ) | ||||
|         .unwrap(); | ||||
|         assert!(result.recovered_data.is_empty()); | ||||
|  | ||||
|         // Test8: Try recovery/reassembly with incorrect index. Hint: does not recover any shreds | ||||
|         assert_matches!( | ||||
|             Shredder::try_recovery( | ||||
|                 &shreds, | ||||
|                 expected_shred_count / 2, | ||||
|                 expected_shred_count / 2, | ||||
|                 15, | ||||
|                 slot, | ||||
|             ), | ||||
|             Err(reed_solomon_erasure::Error::TooFewShardsPresent) | ||||
|         ); | ||||
|  | ||||
|         // Test9: Try recovery/reassembly with incorrect index. Hint: does not recover any shreds | ||||
|         assert_matches!( | ||||
|             Shredder::try_recovery( | ||||
|                 &shreds, | ||||
|                 expected_shred_count / 2, | ||||
|                 expected_shred_count / 2, | ||||
|                 35, | ||||
|                 slot, | ||||
|             ), | ||||
|             Err(reed_solomon_erasure::Error::TooFewShardsPresent) | ||||
|         ); | ||||
|     } | ||||
|   | ||||
| @@ -690,7 +690,6 @@ mod tests { | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     #[ignore] | ||||
|     fn test_storage_stage_process_banks() { | ||||
|         solana_logger::setup(); | ||||
|         let keypair = Arc::new(Keypair::new()); | ||||
|   | ||||
| @@ -23,7 +23,7 @@ use std::time::{Duration, Instant}; | ||||
| pub const NUM_THREADS: u32 = 10; | ||||
|  | ||||
| /// Process a blob: Add blob to the ledger window. | ||||
| pub fn process_shreds(shreds: &[Shred], blocktree: &Arc<Blocktree>) -> Result<()> { | ||||
| pub fn process_shreds(shreds: Vec<Shred>, blocktree: &Arc<Blocktree>) -> Result<()> { | ||||
|     blocktree.insert_shreds(shreds) | ||||
| } | ||||
|  | ||||
| @@ -112,7 +112,7 @@ where | ||||
|         }?; | ||||
|     } | ||||
|  | ||||
|     blocktree.insert_shreds(&shreds)?; | ||||
|     blocktree.insert_shreds(shreds)?; | ||||
|  | ||||
|     trace!( | ||||
|         "Elapsed processing time in recv_window(): {}", | ||||
| @@ -249,7 +249,6 @@ mod test { | ||||
|     use super::*; | ||||
|     use crate::bank_forks::BankForks; | ||||
|     use crate::blocktree::{get_tmp_ledger_path, Blocktree}; | ||||
|     use crate::broadcast_stage::broadcast_utils::entries_to_shreds; | ||||
|     use crate::cluster_info::{ClusterInfo, Node}; | ||||
|     use crate::entry::{make_consecutive_blobs, make_tiny_test_entries, Entry}; | ||||
|     use crate::genesis_utils::create_genesis_block_with_leader; | ||||
| @@ -261,6 +260,7 @@ mod test { | ||||
|     use solana_sdk::hash::Hash; | ||||
|     use solana_sdk::signature::{Keypair, KeypairUtil}; | ||||
|     use std::fs::remove_dir_all; | ||||
|     use std::io::Write; | ||||
|     use std::net::UdpSocket; | ||||
|     use std::sync::atomic::{AtomicBool, Ordering}; | ||||
|     use std::sync::mpsc::channel; | ||||
| @@ -270,7 +270,12 @@ mod test { | ||||
|     fn local_entries_to_shred(entries: Vec<Entry>, keypair: &Arc<Keypair>) -> Vec<Shred> { | ||||
|         let mut shredder = | ||||
|             Shredder::new(0, Some(0), 0.0, keypair, 0).expect("Failed to create entry shredder"); | ||||
|         entries_to_shreds(vec![entries], 0, 0, &mut shredder); | ||||
|         let data = bincode::serialize(&entries).unwrap(); | ||||
|         let mut offset = 0; | ||||
|         while offset < data.len() { | ||||
|             offset += shredder.write(&data[offset..]).unwrap(); | ||||
|         } | ||||
|         shredder.finalize_slot(); | ||||
|         shredder | ||||
|             .shreds | ||||
|             .iter() | ||||
| @@ -287,7 +292,7 @@ mod test { | ||||
|         let shreds = local_entries_to_shred(original_entries.clone(), &Arc::new(Keypair::new())); | ||||
|  | ||||
|         for shred in shreds.into_iter().rev() { | ||||
|             process_shreds(&[shred], &blocktree).expect("Expect successful processing of blob"); | ||||
|             process_shreds(vec![shred], &blocktree).expect("Expect successful processing of blob"); | ||||
|         } | ||||
|  | ||||
|         assert_eq!( | ||||
|   | ||||
		Reference in New Issue
	
	Block a user