diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 896cc23f1c..eac0b4c55b 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -270,13 +270,6 @@ where } }; if shred_filter(&shred, last_root) { - // Mark slot as dead if the current shred is on the boundary - // of max shreds per slot. However, let the current shred - // get retransmitted. It'll allow peer nodes to see this shred - // and trigger them to mark the slot as dead. - if shred.index() >= (MAX_DATA_SHREDS_PER_SLOT - 1) as u32 { - let _ = blockstore.set_dead_slot(shred.slot()); - } packet.meta.slot = shred.slot(); packet.meta.seed = shred.seed(); Some((shred, repair_info)) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 36f00e2fe9..3a891bbcd3 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -1183,7 +1183,23 @@ impl Blockstore { if Self::is_data_shred_present(&shred, slot_meta, &index_meta.data()) { handle_duplicate(shred); return Err(InsertDataShredError::Exists); - } else if !self.should_insert_data_shred( + } + + if shred.last_in_slot() && shred_index < slot_meta.received && !slot_meta.is_full() { + // We got a last shred < slot_meta.received, which signals there's an alternative, + // shorter version of the slot. Because also `!slot_meta.is_full()`, then this + // means, for the current version of the slot, we might never get all the + // shreds < the current last index, never replay this slot, and make no + // progress (for instance if a leader sends an additional detached "last index" + // shred with a very high index, but none of the intermediate shreds). Ideally, we would + // just purge all shreds > the new last index slot, but because replay may have already + // replayed entries past the newly detected "last" shred, then mark the slot as dead + // and wait for replay to dump and repair the correct version. + warn!("Received *last* shred index {} less than previous shred index {}, and slot {} is not full, marking slot dead", shred_index, slot_meta.received, slot); + write_batch.put::(slot, &true).unwrap(); + } + + if !self.should_insert_data_shred( &shred, slot_meta, just_inserted_data_shreds, @@ -2553,14 +2569,18 @@ impl Blockstore { start_index: u64, allow_dead_slots: bool, ) -> Result<(Vec, u64, bool)> { + let (completed_ranges, slot_meta) = self.get_completed_ranges(slot, start_index)?; + + // Check if the slot is dead *after* fetching completed ranges to avoid a race + // where a slot is marked dead by another thread before the completed range query finishes. + // This should be sufficient because full slots will never be marked dead from another thread, + // this can only happen during entry processing during replay stage. if self.is_dead(slot) && !allow_dead_slots { return Err(BlockstoreError::DeadSlot); - } - - let (completed_ranges, slot_meta) = self.get_completed_ranges(slot, start_index)?; - if completed_ranges.is_empty() { + } else if completed_ranges.is_empty() { return Ok((vec![], 0, false)); } + let slot_meta = slot_meta.unwrap(); let num_shreds = completed_ranges .last() @@ -3778,7 +3798,7 @@ pub mod tests { }; use solana_storage_proto::convert::generated; use solana_transaction_status::{InnerInstructions, Reward, Rewards, TransactionTokenBalance}; - use std::time::Duration; + use std::{sync::mpsc::channel, thread::Builder, time::Duration}; // used for tests only pub(crate) fn make_slot_entries_with_transactions(num_entries: u64) -> Vec { @@ -4361,44 +4381,6 @@ pub mod tests { Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); } - #[test] - pub fn test_insert_data_shreds_duplicate() { - // Create RocksDb ledger - let blockstore_path = get_tmp_ledger_path!(); - { - let blockstore = Blockstore::open(&blockstore_path).unwrap(); - - // Make duplicate entries and shreds - let num_unique_entries = 10; - let (mut original_shreds, original_entries) = - make_slot_entries(0, 0, num_unique_entries); - - // Discard first shred - original_shreds.remove(0); - - blockstore - .insert_shreds(original_shreds, None, false) - .unwrap(); - - assert_eq!(blockstore.get_slot_entries(0, 0).unwrap(), vec![]); - - let duplicate_shreds = entries_to_test_shreds(original_entries.clone(), 0, 0, true, 0); - let num_shreds = duplicate_shreds.len() as u64; - blockstore - .insert_shreds(duplicate_shreds, None, false) - .unwrap(); - - assert_eq!(blockstore.get_slot_entries(0, 0).unwrap(), original_entries); - - let meta = blockstore.meta(0).unwrap().unwrap(); - assert_eq!(meta.consumed, num_shreds); - assert_eq!(meta.received, num_shreds); - assert_eq!(meta.parent_slot, 0); - assert_eq!(meta.last_index, num_shreds - 1); - } - Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); - } - #[test] fn test_data_set_completed_on_insert() { let ledger_path = get_tmp_ledger_path!(); @@ -8189,6 +8171,58 @@ pub mod tests { Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); } + #[test] + pub fn test_insert_data_shreds_same_slot_last_index() { + // Create RocksDb ledger + let blockstore_path = get_tmp_ledger_path!(); + { + let blockstore = Blockstore::open(&blockstore_path).unwrap(); + + // Create enough entries to ensure there are at least two shreds created + let num_unique_entries = max_ticks_per_n_shreds(1, None) + 1; + let (mut original_shreds, original_entries) = + make_slot_entries(0, 0, num_unique_entries); + + // Discard first shred, so that the slot is not full + assert!(original_shreds.len() > 1); + let last_index = original_shreds.last().unwrap().index() as u64; + original_shreds.remove(0); + + // Insert the same shreds, including the last shred specifically, multiple + // times + for _ in 0..10 { + blockstore + .insert_shreds(original_shreds.clone(), None, false) + .unwrap(); + let meta = blockstore.meta(0).unwrap().unwrap(); + assert!(!blockstore.is_dead(0)); + assert_eq!(blockstore.get_slot_entries(0, 0).unwrap(), vec![]); + assert_eq!(meta.consumed, 0); + assert_eq!(meta.received, last_index + 1); + assert_eq!(meta.parent_slot, 0); + assert_eq!(meta.last_index, last_index); + assert!(!blockstore.is_full(0)); + } + + let duplicate_shreds = entries_to_test_shreds(original_entries.clone(), 0, 0, true, 0); + let num_shreds = duplicate_shreds.len() as u64; + blockstore + .insert_shreds(duplicate_shreds, None, false) + .unwrap(); + + assert_eq!(blockstore.get_slot_entries(0, 0).unwrap(), original_entries); + + let meta = blockstore.meta(0).unwrap().unwrap(); + assert_eq!(meta.consumed, num_shreds); + assert_eq!(meta.received, num_shreds); + assert_eq!(meta.parent_slot, 0); + assert_eq!(meta.last_index, num_shreds - 1); + assert!(blockstore.is_full(0)); + assert!(!blockstore.is_dead(0)); + } + Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); + } + #[test] fn test_duplicate_last_index() { let num_shreds = 2; @@ -8208,4 +8242,273 @@ pub mod tests { } Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); } + + #[test] + fn test_duplicate_last_index_mark_dead() { + let num_shreds = 10; + let smaller_last_shred_index = 5; + let larger_last_shred_index = 8; + + let setup_test_shreds = |slot: Slot| -> Vec { + let num_entries = max_ticks_per_n_shreds(num_shreds, None); + let (mut shreds, _) = make_slot_entries(slot, 0, num_entries); + shreds[smaller_last_shred_index].set_last_in_slot(); + shreds[larger_last_shred_index].set_last_in_slot(); + shreds + }; + + let get_expected_slot_meta_and_index_meta = + |blockstore: &Blockstore, shreds: Vec| -> (SlotMeta, Index) { + let slot = shreds[0].slot(); + blockstore + .insert_shreds(shreds.clone(), None, false) + .unwrap(); + let meta = blockstore.meta(slot).unwrap().unwrap(); + assert_eq!(meta.consumed, shreds.len() as u64); + let shreds_index = blockstore.get_index(slot).unwrap().unwrap(); + for i in 0..shreds.len() as u64 { + assert!(shreds_index.data().is_present(i)); + } + + // Cleanup the slot + blockstore + .run_purge(slot, slot, PurgeType::PrimaryIndex) + .expect("Purge database operations failed"); + assert!(blockstore.meta(slot).unwrap().is_none()); + + (meta, shreds_index) + }; + + let blockstore_path = get_tmp_ledger_path!(); + { + let blockstore = Blockstore::open(&blockstore_path).unwrap(); + let mut slot = 0; + let shreds = setup_test_shreds(slot); + + // Case 1: Insert in the same batch. Since we're inserting the shreds in order, + // any shreds > smaller_last_shred_index will not be inserted. Slot is not marked + // as dead because no slots > the first "last" index shred are inserted before + // the "last" index shred itself is inserted. + let (expected_slot_meta, expected_index) = get_expected_slot_meta_and_index_meta( + &blockstore, + shreds[..=smaller_last_shred_index].to_vec(), + ); + blockstore + .insert_shreds(shreds.clone(), None, false) + .unwrap(); + assert!(blockstore.get_duplicate_slot(slot).is_some()); + assert!(!blockstore.is_dead(slot)); + for i in 0..num_shreds { + if i <= smaller_last_shred_index as u64 { + assert_eq!( + blockstore.get_data_shred(slot, i).unwrap().unwrap(), + shreds[i as usize].payload + ); + } else { + assert!(blockstore.get_data_shred(slot, i).unwrap().is_none()); + } + } + let mut meta = blockstore.meta(slot).unwrap().unwrap(); + meta.first_shred_timestamp = expected_slot_meta.first_shred_timestamp; + assert_eq!(meta, expected_slot_meta); + assert_eq!(blockstore.get_index(slot).unwrap().unwrap(), expected_index); + + // Case 2: Inserting a duplicate with an even smaller last shred index should not + // mark the slot as dead since the Slotmeta is full. + let mut even_smaller_last_shred_duplicate = + shreds[smaller_last_shred_index - 1].clone(); + even_smaller_last_shred_duplicate.set_last_in_slot(); + // Flip a byte to create a duplicate shred + even_smaller_last_shred_duplicate.payload[0] = + std::u8::MAX - even_smaller_last_shred_duplicate.payload[0]; + assert!(blockstore + .is_shred_duplicate( + slot, + even_smaller_last_shred_duplicate.index(), + &even_smaller_last_shred_duplicate.payload, + true + ) + .is_some()); + blockstore + .insert_shreds(vec![even_smaller_last_shred_duplicate], None, false) + .unwrap(); + assert!(!blockstore.is_dead(slot)); + for i in 0..num_shreds { + if i <= smaller_last_shred_index as u64 { + assert_eq!( + blockstore.get_data_shred(slot, i).unwrap().unwrap(), + shreds[i as usize].payload + ); + } else { + assert!(blockstore.get_data_shred(slot, i).unwrap().is_none()); + } + } + let mut meta = blockstore.meta(slot).unwrap().unwrap(); + meta.first_shred_timestamp = expected_slot_meta.first_shred_timestamp; + assert_eq!(meta, expected_slot_meta); + assert_eq!(blockstore.get_index(slot).unwrap().unwrap(), expected_index); + + // Case 3: Insert shreds in reverse so that consumed will not be updated. Now on insert, the + // the slot should be marked as dead + slot += 1; + let mut shreds = setup_test_shreds(slot); + shreds.reverse(); + blockstore + .insert_shreds(shreds.clone(), None, false) + .unwrap(); + assert!(blockstore.is_dead(slot)); + // All the shreds other than the two last index shreds because those two + // are marked as last, but less than the first received index == 10. + // The others will be inserted even after the slot is marked dead on attempted + // insert of the first last_index shred since dead slots can still be + // inserted into. + for i in 0..num_shreds { + let shred_to_check = &shreds[i as usize]; + let shred_index = shred_to_check.index() as u64; + if shred_index != smaller_last_shred_index as u64 + && shred_index != larger_last_shred_index as u64 + { + assert_eq!( + blockstore + .get_data_shred(slot, shred_index) + .unwrap() + .unwrap(), + shred_to_check.payload + ); + } else { + assert!(blockstore + .get_data_shred(slot, shred_index) + .unwrap() + .is_none()); + } + } + + // Case 4: Same as Case 3, but this time insert the shreds one at a time to test that the clearing + // of data shreds works even after they've been committed + slot += 1; + let mut shreds = setup_test_shreds(slot); + shreds.reverse(); + for shred in shreds.clone() { + blockstore.insert_shreds(vec![shred], None, false).unwrap(); + } + assert!(blockstore.is_dead(slot)); + // All the shreds will be inserted since dead slots can still be inserted into. + for i in 0..num_shreds { + let shred_to_check = &shreds[i as usize]; + let shred_index = shred_to_check.index() as u64; + if shred_index != smaller_last_shred_index as u64 + && shred_index != larger_last_shred_index as u64 + { + assert_eq!( + blockstore + .get_data_shred(slot, shred_index) + .unwrap() + .unwrap(), + shred_to_check.payload + ); + } else { + assert!(blockstore + .get_data_shred(slot, shred_index) + .unwrap() + .is_none()); + } + } + } + Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); + } + + #[test] + fn test_get_slot_entries_dead_slot_race() { + let setup_test_shreds = move |slot: Slot| -> Vec { + let num_shreds = 10; + let middle_shred_index = 5; + let num_entries = max_ticks_per_n_shreds(num_shreds, None); + let (shreds, _) = make_slot_entries(slot, 0, num_entries); + + // Reverse shreds so that last shred gets inserted first and sets meta.received + let mut shreds: Vec = shreds.into_iter().rev().collect(); + + // Push the real middle shred to the end of the shreds list + shreds.push(shreds[middle_shred_index].clone()); + + // Set the middle shred as a last shred to cause the slot to be marked dead + shreds[middle_shred_index].set_last_in_slot(); + shreds + }; + + let blockstore_path = get_tmp_ledger_path!(); + { + let blockstore = Arc::new(Blockstore::open(&blockstore_path).unwrap()); + let (slot_sender, slot_receiver) = channel(); + let (shred_sender, shred_receiver) = channel::>(); + let (signal_sender, signal_receiver) = channel(); + + let t_entry_getter = { + let blockstore = blockstore.clone(); + let signal_sender = signal_sender.clone(); + Builder::new() + .spawn(move || { + while let Ok(slot) = slot_receiver.recv() { + match blockstore.get_slot_entries_with_shred_info(slot, 0, false) { + Ok((_entries, _num_shreds, is_full)) => { + if is_full { + signal_sender + .send(Err(IoError::new( + ErrorKind::Other, + "got full slot entries for dead slot", + ))) + .unwrap(); + } + } + Err(err) => { + assert_matches!(err, BlockstoreError::DeadSlot); + } + } + signal_sender.send(Ok(())).unwrap(); + } + }) + .unwrap() + }; + + let t_shred_inserter = Builder::new() + .spawn(move || { + while let Ok(shreds) = shred_receiver.recv() { + let slot = shreds[0].slot(); + // Grab this lock to block `get_slot_entries` before it fetches completed datasets + // and then mark the slot as dead, but full, by inserting carefully crafted shreds. + let _lowest_cleanup_slot = blockstore.lowest_cleanup_slot.write().unwrap(); + blockstore.insert_shreds(shreds, None, false).unwrap(); + assert!(blockstore.get_duplicate_slot(slot).is_some()); + assert!(blockstore.is_dead(slot)); + assert!(blockstore.meta(slot).unwrap().unwrap().is_full()); + signal_sender.send(Ok(())).unwrap(); + } + }) + .unwrap(); + + for slot in 0..100 { + let shreds = setup_test_shreds(slot); + + // Start a task on each thread to trigger a race condition + slot_sender.send(slot).unwrap(); + shred_sender.send(shreds).unwrap(); + + // Check that each thread processed their task before continuing + for _ in 1..=2 { + let res = signal_receiver.recv().unwrap(); + assert!(res.is_ok(), "race condition: {:?}", res); + } + } + + drop(slot_sender); + drop(shred_sender); + + let handles = vec![t_entry_getter, t_shred_inserter]; + for handle in handles { + assert!(handle.join().is_ok()); + } + } + + Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); + } } diff --git a/ledger/src/blockstore_meta.rs b/ledger/src/blockstore_meta.rs index 00737b3c32..586a61adb5 100644 --- a/ledger/src/blockstore_meta.rs +++ b/ledger/src/blockstore_meta.rs @@ -125,6 +125,10 @@ impl ShredIndex { self.set_present(idx, present); } } + + pub fn largest(&self) -> Option { + self.index.iter().rev().next().copied() + } } impl SlotMeta {