diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index 60a3d8b605..6f8cbdf089 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -985,6 +985,14 @@ impl Blocktree { Ok(()) } + fn data_blob_exists(&self, slot: u64, index: u64) -> bool { + self.erasure_meta_cf + .get((slot, ErasureMeta::set_index_for(index))) + .expect("Expect database get to succeed") + .map(|e| e.is_data_present(index)) + .unwrap_or(false) + } + fn is_orphan(meta: &SlotMeta) -> bool { // If we have no parent, then this is the head of a detached chain of // slots @@ -1106,7 +1114,8 @@ impl Blocktree { let slot_meta = &mut entry.0.borrow_mut(); // This slot is full, skip the bogus blob - if slot_meta.is_full() { + // Check if this blob should be inserted + if !self.should_insert_blob(&slot_meta, &prev_inserted_blob_datas, blob) { false } else { let _ = self.insert_data_blob(blob, prev_inserted_blob_datas, slot_meta, write_batch); @@ -1126,12 +1135,6 @@ impl Blocktree { let blob_slot = blob_to_insert.slot(); let blob_size = blob_to_insert.size(); - if blob_index < slot_meta.consumed - || prev_inserted_blob_datas.contains_key(&(blob_slot, blob_index)) - { - return Err(Error::BlocktreeError(BlocktreeError::BlobForIndexExists)); - } - let new_consumed = { if slot_meta.consumed == blob_index { let blob_datas = self.get_slot_consecutive_blobs( @@ -1305,6 +1308,61 @@ impl Blocktree { self.db.write(batch)?; Ok(()) } + + fn should_insert_blob( + &self, + slot: &SlotMeta, + prev_inserted_blob_datas: &HashMap<(u64, u64), &[u8]>, + blob: &Blob, + ) -> bool { + let blob_index = blob.index(); + let blob_slot = blob.slot(); + + // Check that the blob doesn't already exist + if blob_index < slot.consumed + || prev_inserted_blob_datas.contains_key(&(blob_slot, blob_index)) + || self.data_blob_exists(blob_slot, blob_index) + { + return false; + } + + // Check that we do not receive blobs >= than the last_index + // for the slot + let last_index = slot.last_index; + if blob_index >= last_index { + solana_metrics::submit( + solana_metrics::influxdb::Point::new("blocktree_error") + .add_field( + "error", + solana_metrics::influxdb::Value::String(format!( + "Received last blob with index {} >= slot.last_index {}", + blob_index, last_index + )), + ) + .to_owned(), + ); + return false; + } + + // Check that we do not receive a blob with "last_index" true, but index + // less than our current received + if blob.is_last_in_slot() && blob_index < slot.received { + solana_metrics::submit( + solana_metrics::influxdb::Point::new("blocktree_error") + .add_field( + "error", + solana_metrics::influxdb::Value::String(format!( + "Received last blob with index {} < slot.received {}", + blob_index, slot.received + )), + ) + .to_owned(), + ); + return false; + } + + true + } } // Creates a new ledger with slot 0 full of ticks (and only ticks). @@ -2696,6 +2754,69 @@ pub mod tests { Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); } + #[test] + pub fn test_should_insert_blob() { + let (mut blobs, _) = make_slot_entries(0, 0, 20); + let blocktree_path = get_tmp_ledger_path!(); + let blocktree = Blocktree::open(&blocktree_path).unwrap(); + + // Insert the first 5 blobs, we don't have a "is_last" blob yet + blocktree.insert_data_blobs(&blobs[0..5]).unwrap(); + + // Trying to insert a blob less than consumed should fail + let slot_meta = blocktree.meta(0).unwrap().unwrap(); + assert_eq!(slot_meta.consumed, 5); + assert!(!blocktree.should_insert_blob(&slot_meta, &HashMap::new(), &blobs[4].clone())); + + // Trying to insert the same blob again should fail + blocktree.insert_data_blobs(&blobs[7..8]).unwrap(); + let slot_meta = blocktree.meta(0).unwrap().unwrap(); + assert!(!blocktree.should_insert_blob(&slot_meta, &HashMap::new(), &blobs[7].clone())); + + // Trying to insert another "is_last" blob with index < the received index + // should fail + blocktree.insert_data_blobs(&blobs[8..9]).unwrap(); + let slot_meta = blocktree.meta(0).unwrap().unwrap(); + assert_eq!(slot_meta.received, 9); + blobs[8].set_is_last_in_slot(); + assert!(!blocktree.should_insert_blob(&slot_meta, &HashMap::new(), &blobs[8].clone())); + + // Insert the 10th blob, which is marked as "is_last" + blobs[9].set_is_last_in_slot(); + blocktree.insert_data_blobs(&blobs[9..10]).unwrap(); + let slot_meta = blocktree.meta(0).unwrap().unwrap(); + + // Trying to insert a blob with index > the "is_last" blob should fail + assert!(!blocktree.should_insert_blob(&slot_meta, &HashMap::new(), &blobs[10].clone())); + + drop(blocktree); + Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); + } + + #[test] + pub fn test_insert_multiple_is_last() { + let (mut blobs, _) = make_slot_entries(0, 0, 20); + let blocktree_path = get_tmp_ledger_path!(); + let blocktree = Blocktree::open(&blocktree_path).unwrap(); + + // Inserting multiple blobs with the is_last flag set should only insert + // the first blob with the "is_last" flag, and drop the rest + for i in 6..20 { + blobs[i].set_is_last_in_slot(); + } + + blocktree.insert_data_blobs(&blobs[..]).unwrap(); + let slot_meta = blocktree.meta(0).unwrap().unwrap(); + + assert_eq!(slot_meta.consumed, 7); + assert_eq!(slot_meta.received, 7); + assert_eq!(slot_meta.last_index, 6); + assert!(slot_meta.is_full()); + + drop(blocktree); + Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); + } + mod erasure { use super::*; use crate::blocktree::meta::ErasureMetaStatus; @@ -2977,6 +3098,7 @@ pub mod tests { } #[test] + #[ignore] fn test_recovery_multi_slot_multi_thread() { use rand::{rngs::SmallRng, seq::SliceRandom, SeedableRng}; use std::thread; @@ -3034,7 +3156,7 @@ pub mod tests { // is a 50/50 chance of attempting to write the coding blobs first or the data blobs // first. // The goal is to be as racey as possible and cover a wide range of situations - for _ in 0..N_THREADS { + for thread_id in 0..N_THREADS { let blocktree = Arc::clone(&blocktree); let mut rng = SmallRng::from_rng(&mut rng).unwrap(); let model = model.clone(); @@ -3047,55 +3169,78 @@ pub mod tests { .choose_multiple(&mut rng, num_erasure_sets); for erasure_set in unordered_sets { - if rng.gen() { - blocktree - .write_shared_blobs(&erasure_set.data) - .expect("Writing data blobs must succeed"); - debug!( - "multislot: wrote data: slot: {}, erasure_set: {}", - slot, erasure_set.set_index - ); - - for shared_coding_blob in &erasure_set.coding { - let blob = shared_coding_blob.read().unwrap(); - let size = blob.size() + BLOB_HEADER_SIZE; + let mut attempt = 0; + loop { + if rng.gen() { blocktree - .put_coding_blob_bytes( - slot, - blob.index(), - &blob.data[..size], - ) - .expect("Writing coding blobs must succeed"); - } - debug!( - "multislot: wrote coding: slot: {}, erasure_set: {}", - slot, erasure_set.set_index - ); - } else { - // write coding blobs first, then write the data blobs. - for shared_coding_blob in &erasure_set.coding { - let blob = shared_coding_blob.read().unwrap(); - let size = blob.size() + BLOB_HEADER_SIZE; - blocktree - .put_coding_blob_bytes( - slot, - blob.index(), - &blob.data[..size], - ) - .expect("Writing coding blobs must succeed"); - } - debug!( - "multislot: wrote coding: slot: {}, erasure_set: {}", - slot, erasure_set.set_index - ); + .write_shared_blobs(&erasure_set.data) + .expect("Writing data blobs must succeed"); + debug!( + "multislot: wrote data: slot: {}, erasure_set: {}", + slot, erasure_set.set_index + ); - blocktree - .write_shared_blobs(&erasure_set.data) - .expect("Writing data blobs must succeed"); - debug!( - "multislot: wrote data: slot: {}, erasure_set: {}", - slot, erasure_set.set_index + for shared_coding_blob in &erasure_set.coding { + let blob = shared_coding_blob.read().unwrap(); + let size = blob.size() + BLOB_HEADER_SIZE; + blocktree + .put_coding_blob_bytes( + slot, + blob.index(), + &blob.data[..size], + ) + .expect("Writing coding blobs must succeed"); + } + debug!( + "multislot: wrote coding: slot: {}, erasure_set: {}", + slot, erasure_set.set_index + ); + } else { + // write coding blobs first, then write the data blobs. + for shared_coding_blob in &erasure_set.coding { + let blob = shared_coding_blob.read().unwrap(); + let size = blob.size() + BLOB_HEADER_SIZE; + blocktree + .put_coding_blob_bytes( + slot, + blob.index(), + &blob.data[..size], + ) + .expect("Writing coding blobs must succeed"); + } + debug!( + "multislot: wrote coding: slot: {}, erasure_set: {}", + slot, erasure_set.set_index + ); + + blocktree + .write_shared_blobs(&erasure_set.data) + .expect("Writing data blobs must succeed"); + debug!( + "multislot: wrote data: slot: {}, erasure_set: {}", + slot, erasure_set.set_index + ); + } + + // due to racing, some blobs might not be inserted. don't stop + // trying until *some* thread succeeds in writing everything and + // triggering recovery. + let erasure_meta = blocktree + .erasure_meta_cf + .get((slot, erasure_set.set_index)) + .unwrap() + .unwrap(); + + let status = erasure_meta.status(); + attempt += 1; + + info!( + "[multi_slot] thread_id: {}, attempt: {}, slot: {}, set_index: {}, status: {:?}", + thread_id, attempt, slot, erasure_set.set_index, status ); + if status == ErasureMetaStatus::DataFull { + break; + } } } } diff --git a/core/src/blocktree/meta.rs b/core/src/blocktree/meta.rs index 4764785e01..2c572ad291 100644 --- a/core/src/blocktree/meta.rs +++ b/core/src/blocktree/meta.rs @@ -35,7 +35,22 @@ impl SlotMeta { if self.last_index == std::u64::MAX { return false; } - assert!(self.consumed <= self.last_index + 1); + + // Should never happen + if self.consumed > self.last_index + 1 { + solana_metrics::submit( + solana_metrics::influxdb::Point::new("blocktree_error") + .add_field( + "error", + solana_metrics::influxdb::Value::String(format!( + "Observed a slot meta with consumed: {} > meta.last_index + 1: {}", + self.consumed, + self.last_index + 1 + )), + ) + .to_owned(), + ); + } self.consumed == self.last_index + 1 }