diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index 66318a7ee5..70814654ae 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -67,6 +67,7 @@ db_imports! {kvs, Kvs, "kvstore"} pub const MAX_COMPLETED_SLOTS_IN_CHANNEL: usize = 100_000; +pub type SlotMetaWorkingSetEntry = (Rc>, Option); pub type CompletedSlotsReceiver = Receiver>; #[derive(Debug)] @@ -416,51 +417,29 @@ impl Blocktree { let mut batch_processor = self.batch_processor.write().unwrap(); let mut write_batch = batch_processor.batch()?; - let mut just_inserted_data_shreds = HashMap::new(); let mut just_inserted_coding_shreds = HashMap::new(); + let mut just_inserted_data_shreds = HashMap::new(); let mut erasure_metas = HashMap::new(); let mut slot_meta_working_set = HashMap::new(); let mut index_working_set = HashMap::new(); shreds.into_iter().for_each(|shred| { - let slot = shred.slot(); - let shred_index = u64::from(shred.index()); - - let index_meta = index_working_set.entry(slot).or_insert_with(|| { - self.index_cf - .get(slot) - .unwrap() - .unwrap_or_else(|| Index::new(slot)) - }); - - if let Shred::Coding(coding_shred) = &shred { - // This gives the index of first coding shred in this FEC block - // So, all coding shreds in a given FEC block will have the same set index - let pos = u64::from(coding_shred.header.position); - if shred_index >= pos { - let set_index = shred_index - pos; - - self.insert_coding_shred( - set_index, - coding_shred.header.num_data_shreds as usize, - coding_shred.header.num_coding_shreds as usize, - &mut just_inserted_coding_shreds, - &mut erasure_metas, - index_meta, - shred, - &mut write_batch, - ) - } - } else if self - .insert_data_shred( - &mut slot_meta_working_set, + if let Shred::Coding(_) = &shred { + self.check_insert_coding_shred( + shred, + &mut erasure_metas, &mut index_working_set, - &shred, &mut write_batch, - ) - .unwrap_or(false) - { - just_inserted_data_shreds.insert((slot, shred_index), shred); + &mut just_inserted_coding_shreds, + ); + } else { + self.check_insert_data_shred( + shred, + &mut index_working_set, + &mut slot_meta_working_set, + &mut write_batch, + &mut just_inserted_data_shreds, + ); } }); @@ -473,10 +452,10 @@ impl Blocktree { ); recovered_data.into_iter().for_each(|shred| { - let _ = self.insert_data_shred( - &mut slot_meta_working_set, - &mut index_working_set, + self.insert_recovered_data_shred( &shred, + &mut index_working_set, + &mut slot_meta_working_set, &mut write_batch, ); }); @@ -484,7 +463,7 @@ impl Blocktree { // Handle chaining for the working set handle_chaining(&self.db, &mut write_batch, &slot_meta_working_set)?; - let (should_signal, newly_completed_slots) = prepare_signals( + let (should_signal, newly_completed_slots) = commit_slot_meta_working_set( &slot_meta_working_set, &self.completed_slots_senders, &mut write_batch, @@ -516,20 +495,165 @@ impl Blocktree { Ok(()) } - fn insert_coding_shred( + fn insert_recovered_data_shred( &self, - set_index: u64, - num_data: usize, - num_coding: usize, - prev_inserted_coding_shreds: &mut HashMap<(u64, u64), Shred>, - erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>, - index_meta: &mut Index, - shred: Shred, + shred: &Shred, + index_working_set: &mut HashMap, + slot_meta_working_set: &mut HashMap, write_batch: &mut WriteBatch, + ) { + let slot = shred.slot(); + let (index_meta, mut new_index_meta) = + get_index_meta_entry(&self.db, slot, index_working_set); + let (slot_meta_entry, mut new_slot_meta_entry) = + get_slot_meta_entry(&self.db, slot_meta_working_set, slot, shred.parent()); + + let insert_ok = { + let index_meta = index_meta.unwrap_or_else(|| new_index_meta.as_mut().unwrap()); + let entry = slot_meta_entry.unwrap_or_else(|| new_slot_meta_entry.as_mut().unwrap()); + let mut slot_meta = entry.0.borrow_mut(); + + self.insert_data_shred(&mut slot_meta, index_meta.data_mut(), &shred, write_batch) + .is_ok() + }; + + if insert_ok { + new_index_meta.map(|n| index_working_set.insert(slot, n)); + new_slot_meta_entry.map(|n| slot_meta_working_set.insert(slot, n)); + } + } + + fn check_insert_coding_shred( + &self, + shred: Shred, + erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>, + index_working_set: &mut HashMap, + write_batch: &mut WriteBatch, + just_inserted_coding_shreds: &mut HashMap<(u64, u64), Shred>, ) { let slot = shred.slot(); let shred_index = u64::from(shred.index()); + let (index_meta, mut new_index_meta) = + get_index_meta_entry(&self.db, slot, index_working_set); + + let index_meta = index_meta.unwrap_or_else(|| new_index_meta.as_mut().unwrap()); + // This gives the index of first coding shred in this FEC block + // So, all coding shreds in a given FEC block will have the same set index + if Blocktree::should_insert_coding_shred(&shred, index_meta.coding(), &self.last_root) + && self + .insert_coding_shred(erasure_metas, index_meta, &shred, write_batch) + .is_ok() + { + just_inserted_coding_shreds + .entry((slot, shred_index)) + .or_insert_with(|| shred); + new_index_meta.map(|n| index_working_set.insert(slot, n)); + } + } + + fn check_insert_data_shred( + &self, + shred: Shred, + index_working_set: &mut HashMap, + slot_meta_working_set: &mut HashMap, + write_batch: &mut WriteBatch, + just_inserted_data_shreds: &mut HashMap<(u64, u64), Shred>, + ) { + let slot = shred.slot(); + let shred_index = u64::from(shred.index()); + let (index_meta, mut new_index_meta) = + get_index_meta_entry(&self.db, slot, index_working_set); + let (slot_meta_entry, mut new_slot_meta_entry) = + get_slot_meta_entry(&self.db, slot_meta_working_set, slot, shred.parent()); + + let insert_success = { + let index_meta = index_meta.unwrap_or_else(|| new_index_meta.as_mut().unwrap()); + let entry = slot_meta_entry.unwrap_or_else(|| new_slot_meta_entry.as_mut().unwrap()); + let mut slot_meta = entry.0.borrow_mut(); + + if Blocktree::should_insert_data_shred( + &shred, + &slot_meta, + index_meta.data(), + &self.last_root, + ) { + self.insert_data_shred(&mut slot_meta, index_meta.data_mut(), &shred, write_batch) + .is_ok() + } else { + false + } + }; + + if insert_success { + just_inserted_data_shreds.insert((slot, shred_index), shred); + new_index_meta.map(|n| index_working_set.insert(slot, n)); + new_slot_meta_entry.map(|n| slot_meta_working_set.insert(slot, n)); + } + } + + fn should_insert_coding_shred( + shred: &Shred, + coding_index: &CodingIndex, + last_root: &RwLock, + ) -> bool { + let slot = shred.slot(); + let shred_index = shred.index(); + + let (pos, num_coding) = { + if let Shred::Coding(coding_shred) = &shred { + ( + u32::from(coding_shred.header.position), + coding_shred.header.num_coding_shreds, + ) + } else { + panic!("should_insert_coding_shred called with non-coding shred") + } + }; + + if shred_index < pos { + return false; + } + + let set_index = shred_index - pos; + !(num_coding == 0 + || pos >= u32::from(num_coding) + || std::u32::MAX - set_index < u32::from(num_coding) - 1 + || coding_index.is_present(u64::from(shred_index)) + || slot <= *last_root.read().unwrap()) + } + + fn insert_coding_shred( + &self, + erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>, + index_meta: &mut Index, + shred: &Shred, + write_batch: &mut WriteBatch, + ) -> Result<()> { + let slot = shred.slot(); + let shred_index = u64::from(shred.index()); + let (num_data, num_coding, pos) = { + if let Shred::Coding(coding_shred) = &shred { + ( + coding_shred.header.num_data_shreds as usize, + coding_shred.header.num_coding_shreds as usize, + u64::from(coding_shred.header.position), + ) + } else { + panic!("insert_coding_shred called with non-coding shred") + } + }; + + // Assert guaranteed by integrity checks on the shred that happen before + // `insert_coding_shred` is called + if shred_index < pos { + error!("Due to earlier validation, shred index must be >= pos"); + return Err(Error::BlocktreeError(BlocktreeError::InvalidShredData( + Box::new(bincode::ErrorKind::Custom("shred index < pos".to_string())), + ))); + } + + let set_index = shred_index - pos; let erasure_config = ErasureConfig::new(num_data, num_coding); let erasure_meta = erasure_metas.entry((slot, set_index)).or_insert_with(|| { @@ -548,26 +672,109 @@ impl Blocktree { ); } - let serialized_shred = bincode::serialize(&shred).unwrap(); - let inserted = - write_batch.put_bytes::((slot, shred_index), &serialized_shred); - if inserted.is_ok() { - index_meta.coding_mut().set_present(shred_index, true); + let serialized_shred = bincode::serialize(shred).unwrap(); - // `or_insert_with` used to prevent stack overflow - prev_inserted_coding_shreds - .entry((slot, shred_index)) - .or_insert_with(|| shred); + // Commit step: commit all changes to the mutable structures at once, or none at all. + // We don't want only a subset of these changes going through. + write_batch.put_bytes::((slot, shred_index), &serialized_shred)?; + index_meta.coding_mut().set_present(shred_index, true); + + Ok(()) + } + + fn should_insert_data_shred( + shred: &Shred, + slot_meta: &SlotMeta, + data_index: &DataIndex, + last_root: &RwLock, + ) -> bool { + let shred_index = u64::from(shred.index()); + let slot = shred.slot(); + let last_in_slot = if let Shred::LastInSlot(_) = shred { + debug!("got last in slot"); + true + } else { + false + }; + + // Check that the data shred doesn't already exist in blocktree + if shred_index < slot_meta.consumed || data_index.is_present(shred_index) { + return false; } + + // Check that we do not receive shred_index >= than the last_index + // for the slot + let last_index = slot_meta.last_index; + if shred_index >= last_index { + datapoint_error!( + "blocktree_error", + ( + "error", + format!( + "Received index {} >= slot.last_index {}", + shred_index, last_index + ), + String + ) + ); + return false; + } + // Check that we do not receive a blob with "last_index" true, but shred_index + // less than our current received + if last_in_slot && shred_index < slot_meta.received { + datapoint_error!( + "blocktree_error", + ( + "error", + format!( + "Received shred_index {} < slot.received {}", + shred_index, slot_meta.received + ), + String + ) + ); + return false; + } + + let last_root = *last_root.read().unwrap(); + if !is_valid_write_to_slot_0(slot, slot_meta.parent_slot, last_root) { + // Check that the parent_slot < slot + if slot_meta.parent_slot >= slot { + datapoint_error!( + "blocktree_error", + ( + "error", + format!( + "Received blob with parent_slot {} >= slot {}", + slot_meta.parent_slot, slot + ), + String + ) + ); + return false; + } + + // Check that the blob is for a slot that is past the root + if slot <= last_root { + return false; + } + + // Ignore blobs that chain to slots before the last root + if slot_meta.parent_slot < last_root { + return false; + } + } + + true } fn insert_data_shred( &self, - mut slot_meta_working_set: &mut HashMap>, Option)>, - index_working_set: &mut HashMap, + slot_meta: &mut SlotMeta, + data_index: &mut DataIndex, shred: &Shred, write_batch: &mut WriteBatch, - ) -> Result { + ) -> Result<()> { let slot = shred.slot(); let index = u64::from(shred.index()); let parent = shred.parent(); @@ -579,9 +786,6 @@ impl Blocktree { false }; - let entry = get_slot_meta_entry(&self.db, &mut slot_meta_working_set, slot, parent); - - let slot_meta = &mut entry.0.borrow_mut(); if is_orphan(slot_meta) { slot_meta.parent_slot = parent; } @@ -595,45 +799,26 @@ impl Blocktree { .unwrap_or(false) }; - let index_meta = index_working_set - .get_mut(&slot) - .expect("Index must be present for all data shreds") - .data_mut(); + let new_consumed = if slot_meta.consumed == index { + let mut current_index = index + 1; - if !index_meta.is_present(index) - && should_insert( - slot_meta, - index, - slot, - last_in_slot, - check_data_cf, - *self.last_root.read().unwrap(), - ) - { - let new_consumed = if slot_meta.consumed == index { - let mut current_index = index + 1; - - while index_meta.is_present(current_index) || check_data_cf(slot, current_index) { - current_index += 1; - } - current_index - } else { - slot_meta.consumed - }; - - let serialized_shred = bincode::serialize(shred).unwrap(); - - // Commit step: commit all changes to the mutable structures at once, or none at all. - // We don't want only a subset of these changes going through. - write_batch.put_bytes::((slot, index), &serialized_shred)?; - update_slot_meta(last_in_slot, slot_meta, index, new_consumed); - index_meta.set_present(index, true); - trace!("inserted shred into slot {:?} and index {:?}", slot, index); - Ok(true) + while data_index.is_present(current_index) || check_data_cf(slot, current_index) { + current_index += 1; + } + current_index } else { - debug!("didn't insert shred"); - Ok(false) - } + slot_meta.consumed + }; + + let serialized_shred = bincode::serialize(shred).unwrap(); + + // Commit step: commit all changes to the mutable structures at once, or none at all. + // We don't want only a subset of these changes going through. + write_batch.put_bytes::((slot, index), &serialized_shred)?; + update_slot_meta(last_in_slot, slot_meta, index, new_consumed); + data_index.set_present(index, true); + trace!("inserted shred into slot {:?} and index {:?}", slot, index); + Ok(()) } pub fn get_data_shred(&self, slot: u64, index: u64) -> Result>> { @@ -1086,112 +1271,65 @@ fn update_slot_meta( }; } -fn get_slot_meta_entry<'a>( +fn get_index_meta_entry<'a>( db: &Database, - slot_meta_working_set: &'a mut HashMap>, Option)>, slot: u64, - parent_slot: u64, -) -> &'a mut (Rc>, Option) { - let meta_cf = db.column::(); + index_working_set: &'a mut HashMap, +) -> (Option<&'a mut Index>, Option) { + let index_cf = db.column::(); + index_working_set + .get_mut(&slot) + .map(|i| (Some(i), None)) + .unwrap_or_else(|| { + let newly_inserted_meta = Some( + index_cf + .get(slot) + .unwrap() + .unwrap_or_else(|| Index::new(slot)), + ); - // Check if we've already inserted the slot metadata for this shred's slot - slot_meta_working_set.entry(slot).or_insert_with(|| { - // Store a 2-tuple of the metadata (working copy, backup copy) - if let Some(mut meta) = meta_cf.get(slot).expect("Expect database get to succeed") { - let backup = Some(meta.clone()); - // If parent_slot == std::u64::MAX, then this is one of the orphans inserted - // during the chaining process, see the function find_slot_meta_in_cached_state() - // for details. Slots that are orphans are missing a parent_slot, so we should - // fill in the parent now that we know it. - if is_orphan(&meta) { - meta.parent_slot = parent_slot; - } - - (Rc::new(RefCell::new(meta)), backup) - } else { - ( - Rc::new(RefCell::new(SlotMeta::new(slot, parent_slot))), - None, - ) - } - }) + (None, newly_inserted_meta) + }) } -fn should_insert( - slot_meta: &SlotMeta, - index: u64, +fn get_slot_meta_entry<'a>( + db: &Database, + slot_meta_working_set: &'a mut HashMap, slot: u64, - last_in_slot: bool, - db_check: F, - last_root: u64, -) -> bool -where - F: Fn(u64, u64) -> bool, -{ - // Check that the index doesn't already exist - if index < slot_meta.consumed || db_check(slot, index) { - return false; - } - // Check that we do not receive index >= than the last_index - // for the slot - let last_index = slot_meta.last_index; - if index >= last_index { - datapoint_error!( - "blocktree_error", - ( - "error", - format!("Received index {} >= slot.last_index {}", index, last_index), - String - ) - ); - return false; - } - // Check that we do not receive a shred with "last_index" true, but index - // less than our current received - if last_in_slot && index < slot_meta.received { - datapoint_error!( - "blocktree_error", - ( - "error", - format!( - "Received index {} < slot.received {}", - index, slot_meta.received - ), - String - ) - ); - return false; - } + parent_slot: u64, +) -> ( + Option<&'a mut SlotMetaWorkingSetEntry>, + Option, +) { + let meta_cf = db.column::(); - if !is_valid_write_to_slot_0(slot, slot_meta.parent_slot, last_root) { - // Check that the parent_slot < slot - if slot_meta.parent_slot >= slot { - datapoint_error!( - "blocktree_error", + // Check if we've already inserted the slot metadata for this blob's slot + slot_meta_working_set + .get_mut(&slot) + .map(|s| (Some(s), None)) + .unwrap_or_else(|| { + // Store a 2-tuple of the metadata (working copy, backup copy) + if let Some(mut meta) = meta_cf.get(slot).expect("Expect database get to succeed") { + let backup = Some(meta.clone()); + // If parent_slot == std::u64::MAX, then this is one of the orphans inserted + // during the chaining process, see the function find_slot_meta_in_cached_state() + // for details. Slots that are orphans are missing a parent_slot, so we should + // fill in the parent now that we know it. + if is_orphan(&meta) { + meta.parent_slot = parent_slot; + } + + (None, Some((Rc::new(RefCell::new(meta)), backup))) + } else { ( - "error", - format!( - "Received shred with parent_slot {} >= slot {}", - slot_meta.parent_slot, slot - ), - String + None, + Some(( + Rc::new(RefCell::new(SlotMeta::new(slot, parent_slot))), + None, + )), ) - ); - return false; - } - - // Check that the shred is for a slot that is past the root - if slot <= last_root { - return false; - } - - // Ignore shreds that chain to slots before the last root - if slot_meta.parent_slot < last_root { - return false; - } - } - - true + } + }) } fn is_valid_write_to_slot_0(slot_to_write: u64, parent_slot: u64, last_root: u64) -> bool { @@ -1239,8 +1377,8 @@ fn send_signals( Ok(()) } -fn prepare_signals( - slot_meta_working_set: &HashMap>, Option)>, +fn commit_slot_meta_working_set( + slot_meta_working_set: &HashMap, completed_slots_senders: &[SyncSender>], write_batch: &mut WriteBatch, ) -> Result<(bool, Vec)> { @@ -1270,7 +1408,7 @@ fn prepare_signals( // 3) Create a dummy orphan slot in the database fn find_slot_meta_else_create<'a>( db: &Database, - working_set: &'a HashMap>, Option)>, + working_set: &'a HashMap, chained_slots: &'a mut HashMap>>, slot_index: u64, ) -> Result>> { @@ -1306,7 +1444,7 @@ fn find_slot_meta_in_db_else_create<'a>( // Find the slot metadata in the cache of dirty slot metadata we've previously touched fn find_slot_meta_in_cached_state<'a>( - working_set: &'a HashMap>, Option)>, + working_set: &'a HashMap, chained_slots: &'a HashMap>>, slot: u64, ) -> Result>>> { @@ -1339,7 +1477,7 @@ fn get_slot_consecutive_shreds<'a>( fn handle_chaining( db: &Database, write_batch: &mut WriteBatch, - working_set: &HashMap>, Option)>, + working_set: &HashMap, ) -> Result<()> { let mut new_chained_slots = HashMap::new(); let working_set_slots: Vec<_> = working_set.iter().map(|s| *s.0).collect(); @@ -1358,7 +1496,7 @@ fn handle_chaining( fn handle_chaining_for_slot( db: &Database, write_batch: &mut WriteBatch, - working_set: &HashMap>, Option)>, + working_set: &HashMap, new_chained_slots: &mut HashMap>>, slot: u64, ) -> Result<()> { @@ -1438,7 +1576,7 @@ fn traverse_children_mut( db: &Database, slot: u64, slot_meta: &Rc>, - working_set: &HashMap>, Option)>, + working_set: &HashMap, new_chained_slots: &mut HashMap>>, slot_function: F, ) -> Result<()> @@ -1620,6 +1758,7 @@ pub fn entries_to_test_shreds( pub mod tests { use super::*; use crate::entry::{create_ticks, make_tiny_test_entries, Entry}; + use crate::shred::CodingShred; use itertools::Itertools; use rand::seq::SliceRandom; use rand::thread_rng; @@ -2965,62 +3104,230 @@ pub mod tests { } #[test] - pub fn test_should_insert_shred() { - let (mut shreds, _) = make_slot_entries(0, 0, 100); + pub fn test_should_insert_data_shred() { + let (shreds, _) = make_slot_entries(0, 0, 100); let blocktree_path = get_tmp_ledger_path!(); { let blocktree = Blocktree::open(&blocktree_path).unwrap(); + let index_cf = blocktree.db.column::(); + let last_root = RwLock::new(0); + // Insert the first 5 shreds, we don't have a "is_last" shred yet - let shreds1 = shreds.drain(0..5).collect_vec(); - blocktree.insert_shreds(shreds1).unwrap(); + blocktree.insert_shreds(shreds[0..5].to_vec()).unwrap(); - let data_cf = blocktree.db.column::(); - - let check_data_cf = |slot, index| { - data_cf - .get_bytes((slot, index)) - .map(|opt| opt.is_some()) - .unwrap_or(false) - }; - - // Trying to insert a shred less than consumed should fail + // Trying to insert a shred less than `slot_meta.consumed` should fail let slot_meta = blocktree.meta(0).unwrap().unwrap(); + let index = index_cf.get(0).unwrap().unwrap(); assert_eq!(slot_meta.consumed, 5); - assert!(!should_insert(&slot_meta, 3, 0, false, check_data_cf, 0)); + assert!(!Blocktree::should_insert_data_shred( + &shreds[1], + &slot_meta, + index.data(), + &last_root + )); // Trying to insert the same shred again should fail - // skip over shred 5 - let shreds1 = shreds.drain(1..2).collect_vec(); - blocktree.insert_shreds(shreds1).unwrap(); + // skip over shred 5 so the `slot_meta.consumed` doesn't increment + blocktree.insert_shreds(shreds[6..7].to_vec()).unwrap(); let slot_meta = blocktree.meta(0).unwrap().unwrap(); - assert!(!should_insert(&slot_meta, 6, 0, false, check_data_cf, 0)); + let index = index_cf.get(0).unwrap().unwrap(); + assert!(!Blocktree::should_insert_data_shred( + &shreds[6], + &slot_meta, + index.data(), + &last_root + )); - // Trying to insert another "is_last" shred with index < the received index - // should fail - // skip over shred 5 and 7 - let shreds1 = shreds.drain(2..3).collect_vec(); - blocktree.insert_shreds(shreds1).unwrap(); + // Trying to insert another "is_last" shred with index < the received index should fail + // skip over shred 7 + blocktree.insert_shreds(shreds[8..9].to_vec()).unwrap(); let slot_meta = blocktree.meta(0).unwrap().unwrap(); + let index = index_cf.get(0).unwrap().unwrap(); assert_eq!(slot_meta.received, 9); - assert!(!should_insert(&slot_meta, 7, 0, true, check_data_cf, 0)); + let shred7 = { + if let Shred::Data(ref s) = shreds[7] { + Shred::LastInSlot(s.clone()) + } else { + panic!("Shred in unexpected format") + } + }; + assert!(!Blocktree::should_insert_data_shred( + &shred7, + &slot_meta, + index.data(), + &last_root + )); // Insert all pending shreds + let mut shred8 = shreds[8].clone(); blocktree.insert_shreds(shreds).unwrap(); let slot_meta = blocktree.meta(0).unwrap().unwrap(); + let index = index_cf.get(0).unwrap().unwrap(); // Trying to insert a shred with index > the "is_last" shred should fail - assert!(!should_insert( + if let Shred::Data(ref mut s) = shred8 { + s.header.common_header.slot = slot_meta.last_index + 1; + } else { + panic!("Shred in unexpected format") + } + assert!(!Blocktree::should_insert_data_shred( + &shred7, &slot_meta, - slot_meta.last_index + 1, - 0, - true, - check_data_cf, - 0 + index.data(), + &last_root )); } Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); } + #[test] + pub fn test_should_insert_coding_shred() { + let blocktree_path = get_tmp_ledger_path!(); + { + let blocktree = Blocktree::open(&blocktree_path).unwrap(); + let index_cf = blocktree.db.column::(); + let last_root = RwLock::new(0); + + let mut shred = CodingShred::default(); + let slot = 1; + shred.header.position = 10; + shred.header.common_header.index = 11; + shred.header.common_header.slot = 1; + shred.header.num_coding_shreds = shred.header.position + 1; + let coding_shred = Shred::Coding(shred.clone()); + + // Insert a good coding shred + assert!(Blocktree::should_insert_coding_shred( + &coding_shred, + Index::new(slot).coding(), + &last_root + )); + + // Insertion should succeed + blocktree.insert_shreds(vec![coding_shred.clone()]).unwrap(); + + // Trying to insert the same shred again should fail + { + let index = index_cf + .get(shred.header.common_header.slot) + .unwrap() + .unwrap(); + assert!(!Blocktree::should_insert_coding_shred( + &coding_shred, + index.coding(), + &last_root + )); + } + + shred.header.common_header.index += 1; + + // Establish a baseline that works + { + let index = index_cf + .get(shred.header.common_header.slot) + .unwrap() + .unwrap(); + assert!(Blocktree::should_insert_coding_shred( + &Shred::Coding(shred.clone()), + index.coding(), + &last_root + )); + } + + // Trying to insert a shred with index < position should fail + { + let mut shred_ = shred.clone(); + shred_.header.common_header.index = (shred_.header.position - 1).into(); + let index = index_cf + .get(shred_.header.common_header.slot) + .unwrap() + .unwrap(); + assert!(!Blocktree::should_insert_coding_shred( + &Shred::Coding(shred_), + index.coding(), + &last_root + )); + } + + // Trying to insert shred with num_coding == 0 should fail + { + let mut shred_ = shred.clone(); + shred_.header.num_coding_shreds = 0; + let index = index_cf + .get(shred_.header.common_header.slot) + .unwrap() + .unwrap(); + assert!(!Blocktree::should_insert_coding_shred( + &Shred::Coding(shred_), + index.coding(), + &last_root + )); + } + + // Trying to insert shred with pos >= num_coding should fail + { + let mut shred_ = shred.clone(); + shred_.header.num_coding_shreds = shred_.header.position; + let index = index_cf + .get(shred_.header.common_header.slot) + .unwrap() + .unwrap(); + assert!(!Blocktree::should_insert_coding_shred( + &Shred::Coding(shred_), + index.coding(), + &last_root + )); + } + + // Trying to insert with set_index with num_coding that would imply the last blob + // has index > u32::MAX should fail + { + let mut shred_ = shred.clone(); + shred_.header.num_coding_shreds = 3; + shred_.header.common_header.index = std::u32::MAX - 1; + shred_.header.position = 0; + let index = index_cf + .get(shred_.header.common_header.slot) + .unwrap() + .unwrap(); + assert!(!Blocktree::should_insert_coding_shred( + &Shred::Coding(shred_.clone()), + index.coding(), + &last_root + )); + + // Decreasing the number of num_coding_shreds will put it within the allowed limit + shred_.header.num_coding_shreds = 2; + let coding_shred = Shred::Coding(shred_); + assert!(Blocktree::should_insert_coding_shred( + &coding_shred, + index.coding(), + &last_root + )); + + // Insertion should succeed + blocktree.insert_shreds(vec![coding_shred]).unwrap(); + } + + // Trying to insert value into slot <= than last root should fail + { + let mut shred_ = shred.clone(); + let index = index_cf + .get(shred_.header.common_header.slot) + .unwrap() + .unwrap(); + shred_.header.common_header.slot = *last_root.read().unwrap(); + assert!(!Blocktree::should_insert_coding_shred( + &Shred::Coding(shred_), + index.coding(), + &last_root + )); + } + } + + Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); + } + #[test] pub fn test_insert_multiple_is_last() { let (shreds, _) = make_slot_entries(0, 0, 20); diff --git a/core/src/shred.rs b/core/src/shred.rs index 2ac44e8668..474c7446b9 100644 --- a/core/src/shred.rs +++ b/core/src/shred.rs @@ -12,7 +12,7 @@ use std::io::{Error as IOError, ErrorKind, Write}; use std::sync::Arc; use std::{cmp, io}; -#[derive(Serialize, Deserialize, PartialEq, Debug)] +#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)] pub enum Shred { FirstInSlot(FirstDataShred), FirstInFECSet(DataShred), @@ -130,7 +130,7 @@ impl Shred { } /// A common header that is present at start of every shred -#[derive(Serialize, Deserialize, Default, PartialEq, Debug)] +#[derive(Serialize, Clone, Deserialize, Default, PartialEq, Debug)] pub struct ShredCommonHeader { pub signature: Signature, pub slot: u64, @@ -138,7 +138,7 @@ pub struct ShredCommonHeader { } /// A common header that is present at start of every data shred -#[derive(Serialize, Deserialize, Default, PartialEq, Debug)] +#[derive(Serialize, Clone, Deserialize, Default, PartialEq, Debug)] pub struct DataShredHeader { _reserved: CodingShredHeader, pub common_header: ShredCommonHeader, @@ -147,14 +147,14 @@ pub struct DataShredHeader { } /// The first data shred also has parent slot value in it -#[derive(Serialize, Deserialize, Default, PartialEq, Debug)] +#[derive(Serialize, Clone, Deserialize, Default, PartialEq, Debug)] pub struct FirstDataShredHeader { pub data_header: DataShredHeader, pub parent: u64, } /// The coding shred header has FEC information -#[derive(Serialize, Deserialize, Default, PartialEq, Debug)] +#[derive(Serialize, Clone, Deserialize, Default, PartialEq, Debug)] pub struct CodingShredHeader { pub common_header: ShredCommonHeader, pub num_data_shreds: u16, @@ -163,19 +163,19 @@ pub struct CodingShredHeader { pub payload: Vec, } -#[derive(Serialize, Deserialize, PartialEq, Debug)] +#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)] pub struct FirstDataShred { pub header: FirstDataShredHeader, pub payload: Vec, } -#[derive(Serialize, Deserialize, PartialEq, Debug)] +#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)] pub struct DataShred { pub header: DataShredHeader, pub payload: Vec, } -#[derive(Serialize, Deserialize, PartialEq, Debug)] +#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)] pub struct CodingShred { pub header: CodingShredHeader, } @@ -454,7 +454,7 @@ impl Shredder { first_shred } - fn new_coding_shred( + pub fn new_coding_shred( slot: u64, index: u32, num_data: usize, diff --git a/local_cluster/tests/local_cluster.rs b/local_cluster/tests/local_cluster.rs index ec96508241..4512b5cbea 100644 --- a/local_cluster/tests/local_cluster.rs +++ b/local_cluster/tests/local_cluster.rs @@ -303,7 +303,6 @@ fn test_listener_startup() { #[allow(unused_attributes)] #[test] #[serial] -#[ignore] fn test_snapshots_blocktree_floor() { // First set up the cluster with 1 snapshotting leader let snapshot_interval_slots = 10; @@ -347,7 +346,7 @@ fn test_snapshots_blocktree_floor() { fs::hard_link(tar, &validator_tar_path).unwrap(); let slot_floor = snapshot_utils::bank_slot_from_archive(&validator_tar_path).unwrap(); - // Start up a new node from a snapshot, wait for it to catchup with the leader + // Start up a new node from a snapshot let validator_stake = 5; cluster.add_validator( &validator_snapshot_test_config.validator_config, @@ -361,7 +360,7 @@ fn test_snapshots_blocktree_floor() { let validator_client = cluster.get_validator_client(&validator_id).unwrap(); let mut current_slot = 0; - // Make sure this validator can get repaired past the first few warmup epochs + // Let this validator run a while with repair let target_slot = slot_floor + 40; while current_slot <= target_slot { trace!("current_slot: {}", current_slot); @@ -380,6 +379,7 @@ fn test_snapshots_blocktree_floor() { // Skip the zeroth slot in blocktree that the ledger is initialized with let (first_slot, _) = blocktree.slot_meta_iterator(1).unwrap().next().unwrap(); + assert_eq!(first_slot, slot_floor); }