diff --git a/core/benches/blocktree.rs b/core/benches/blocktree.rs index 9edb41c45c..7f09847a52 100644 --- a/core/benches/blocktree.rs +++ b/core/benches/blocktree.rs @@ -18,7 +18,7 @@ fn bench_write_shreds(bench: &mut Bencher, entries: Vec, ledger_path: &Pa Blocktree::open(ledger_path).expect("Expected to be able to open database ledger"); bench.iter(move || { let shreds = entries_to_test_shreds(entries.clone(), 0, 0, true); - blocktree.insert_shreds(shreds).unwrap(); + blocktree.insert_shreds(shreds, None).unwrap(); }); Blocktree::destroy(ledger_path).expect("Expected successful database destruction"); @@ -38,7 +38,7 @@ fn setup_read_bench( // Convert the entries to shreds, write the shreds to the ledger let shreds = entries_to_test_shreds(entries, slot, slot.saturating_sub(1), true); blocktree - .insert_shreds(shreds) + .insert_shreds(shreds, None) .expect("Expectd successful insertion of shreds into ledger"); } @@ -130,7 +130,7 @@ fn bench_insert_data_shred_small(bench: &mut Bencher) { let entries = make_tiny_test_entries(num_entries); bench.iter(move || { let shreds = entries_to_test_shreds(entries.clone(), 0, 0, true); - blocktree.insert_shreds(shreds).unwrap(); + blocktree.insert_shreds(shreds, None).unwrap(); }); Blocktree::destroy(&ledger_path).expect("Expected successful database destruction"); } @@ -145,7 +145,7 @@ fn bench_insert_data_shred_big(bench: &mut Bencher) { let entries = make_large_test_entries(num_entries); bench.iter(move || { let shreds = entries_to_test_shreds(entries.clone(), 0, 0, true); - blocktree.insert_shreds(shreds).unwrap(); + blocktree.insert_shreds(shreds, None).unwrap(); }); Blocktree::destroy(&ledger_path).expect("Expected successful database destruction"); } diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index 5fdef264d2..7861ef99fc 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -33,6 +33,7 @@ use std::sync::{Arc, RwLock}; pub use self::meta::*; pub use self::rooted_slot_iterator::*; +use crate::leader_schedule_cache::LeaderScheduleCache; use solana_sdk::timing::Slot; mod db; @@ -412,7 +413,11 @@ impl Blocktree { recovered_data_shreds } - pub fn insert_shreds(&self, shreds: Vec) -> Result<()> { + pub fn insert_shreds( + &self, + shreds: Vec, + leader_schedule: Option<&Arc>, + ) -> Result<()> { let db = &*self.db; let mut batch_processor = self.batch_processor.write().unwrap(); let mut write_batch = batch_processor.batch()?; @@ -443,22 +448,29 @@ impl Blocktree { } }); - let recovered_data = Self::try_shred_recovery( - &db, - &erasure_metas, - &index_working_set, - &mut just_inserted_data_shreds, - &mut just_inserted_coding_shreds, - ); - - recovered_data.into_iter().for_each(|shred| { - self.insert_recovered_data_shred( - &shred, - &mut index_working_set, - &mut slot_meta_working_set, - &mut write_batch, + if let Some(leader_schedule_cache) = leader_schedule { + let recovered_data = Self::try_shred_recovery( + &db, + &erasure_metas, + &index_working_set, + &mut just_inserted_data_shreds, + &mut just_inserted_coding_shreds, ); - }); + + recovered_data.into_iter().for_each(|shred| { + if let Some(leader) = leader_schedule_cache.slot_leader_at(shred.slot(), None) { + if shred.verify(&leader) { + self.check_insert_data_shred( + shred, + &mut index_working_set, + &mut slot_meta_working_set, + &mut write_batch, + &mut just_inserted_coding_shreds, + ) + } + } + }); + } // Handle chaining for the working set handle_chaining(&self.db, &mut write_batch, &slot_meta_working_set)?; @@ -495,34 +507,6 @@ impl Blocktree { Ok(()) } - fn insert_recovered_data_shred( - &self, - shred: &Shred, - index_working_set: &mut HashMap, - slot_meta_working_set: &mut HashMap, - write_batch: &mut WriteBatch, - ) { - let slot = shred.slot(); - let (index_meta, mut new_index_meta) = - get_index_meta_entry(&self.db, slot, index_working_set); - let (slot_meta_entry, mut new_slot_meta_entry) = - get_slot_meta_entry(&self.db, slot_meta_working_set, slot, shred.parent()); - - let insert_ok = { - let index_meta = index_meta.unwrap_or_else(|| new_index_meta.as_mut().unwrap()); - let entry = slot_meta_entry.unwrap_or_else(|| new_slot_meta_entry.as_mut().unwrap()); - let mut slot_meta = entry.0.borrow_mut(); - - self.insert_data_shred(&mut slot_meta, index_meta.data_mut(), &shred, write_batch) - .is_ok() - }; - - if insert_ok { - new_index_meta.map(|n| index_working_set.insert(slot, n)); - new_slot_meta_entry.map(|n| slot_meta_working_set.insert(slot, n)); - } - } - fn check_insert_coding_shred( &self, shred: Shred, @@ -941,7 +925,7 @@ impl Blocktree { all_shreds.extend(shreds); let num_shreds = all_shreds.len(); - self.insert_shreds(all_shreds)?; + self.insert_shreds(all_shreds, None)?; Ok(num_shreds) } @@ -1656,7 +1640,7 @@ pub fn create_new_ledger(ledger_path: &Path, genesis_block: &GenesisBlock) -> Re .map(|s| bincode::deserialize(s).unwrap()) .collect(); - blocktree.insert_shreds(shreds)?; + blocktree.insert_shreds(shreds, None)?; blocktree.set_roots(&[0])?; Ok(last_hash) @@ -1911,7 +1895,7 @@ pub mod tests { let ledger_path = get_tmp_ledger_path("test_read_shreds_bytes"); let ledger = Blocktree::open(&ledger_path).unwrap(); - ledger.insert_shreds(shreds).unwrap(); + ledger.insert_shreds(shreds, None).unwrap(); let mut buf = [0; 4096]; let (_, bytes) = ledger.get_data_shreds(slot, 0, 1, &mut buf).unwrap(); @@ -1974,7 +1958,7 @@ pub mod tests { // Insert last shred, we're missing the other shreds, so no consecutive // shreds starting from slot 0, index 0 should exist. let last_shred = shreds.pop().unwrap(); - ledger.insert_shreds(vec![last_shred]).unwrap(); + ledger.insert_shreds(vec![last_shred], None).unwrap(); assert!(ledger.get_slot_entries(0, 0, None).unwrap().is_empty()); let meta = ledger @@ -1984,7 +1968,7 @@ pub mod tests { assert!(meta.consumed == 0 && meta.received == num_shreds); // Insert the other shreds, check for consecutive returned entries - ledger.insert_shreds(shreds).unwrap(); + ledger.insert_shreds(shreds, None).unwrap(); let result = ledger.get_slot_entries(0, 0, None).unwrap(); assert_eq!(result, entries); @@ -2017,7 +2001,7 @@ pub mod tests { // Insert shreds in reverse, check for consecutive returned shreds for i in (0..num_shreds).rev() { let shred = shreds.pop().unwrap(); - ledger.insert_shreds(vec![shred]).unwrap(); + ledger.insert_shreds(vec![shred], None).unwrap(); let result = ledger.get_slot_entries(0, 0, None).unwrap(); let meta = ledger @@ -2095,7 +2079,7 @@ pub mod tests { let entries = make_tiny_test_entries(8); let shreds = entries_to_test_shreds(entries[0..4].to_vec(), 1, 0, false); blocktree - .insert_shreds(shreds) + .insert_shreds(shreds, None) .expect("Expected successful write of shreds"); let mut shreds1 = entries_to_test_shreds(entries[4..].to_vec(), 1, 0, false); @@ -2103,7 +2087,7 @@ pub mod tests { b.set_index(8 + i as u32); } blocktree - .insert_shreds(shreds1) + .insert_shreds(shreds1, None) .expect("Expected successful write of shreds"); assert_eq!( @@ -2137,7 +2121,7 @@ pub mod tests { index += 1; } blocktree - .insert_shreds(shreds) + .insert_shreds(shreds, None) .expect("Expected successful write of shreds"); assert_eq!( blocktree @@ -2170,7 +2154,7 @@ pub mod tests { entries_to_test_shreds(entries.clone(), slot, slot.saturating_sub(1), false); assert!(shreds.len() as u64 >= shreds_per_slot); blocktree - .insert_shreds(shreds) + .insert_shreds(shreds, None) .expect("Expected successful write of shreds"); assert_eq!(blocktree.get_slot_entries(slot, 0, None).unwrap(), entries); } @@ -2198,7 +2182,7 @@ pub mod tests { odd_shreds.insert(0, shreds.remove(i as usize)); } } - blocktree.insert_shreds(odd_shreds).unwrap(); + blocktree.insert_shreds(odd_shreds, None).unwrap(); assert_eq!(blocktree.get_slot_entries(slot, 0, None).unwrap(), vec![]); @@ -2216,7 +2200,7 @@ pub mod tests { assert_eq!(meta.last_index, std::u64::MAX); } - blocktree.insert_shreds(shreds).unwrap(); + blocktree.insert_shreds(shreds, None).unwrap(); assert_eq!( blocktree.get_slot_entries(slot, 0, None).unwrap(), @@ -2249,13 +2233,13 @@ pub mod tests { // Discard first shred original_shreds.remove(0); - blocktree.insert_shreds(original_shreds).unwrap(); + blocktree.insert_shreds(original_shreds, None).unwrap(); assert_eq!(blocktree.get_slot_entries(0, 0, None).unwrap(), vec![]); let duplicate_shreds = entries_to_test_shreds(original_entries.clone(), 0, 0, true); let num_shreds = duplicate_shreds.len() as u64; - blocktree.insert_shreds(duplicate_shreds).unwrap(); + blocktree.insert_shreds(duplicate_shreds, None).unwrap(); assert_eq!( blocktree.get_slot_entries(0, 0, None).unwrap(), @@ -2285,16 +2269,16 @@ pub mod tests { // Insert second shred, but we're missing the first shred, so no consecutive // shreds starting from slot 0, index 0 should exist. - ledger.insert_shreds(vec![shreds.remove(1)]).unwrap(); + ledger.insert_shreds(vec![shreds.remove(1)], None).unwrap(); let timer = Duration::new(1, 0); assert!(recvr.recv_timeout(timer).is_err()); // Insert first shred, now we've made a consecutive block - ledger.insert_shreds(vec![shreds.remove(0)]).unwrap(); + ledger.insert_shreds(vec![shreds.remove(0)], None).unwrap(); // Wait to get notified of update, should only be one update assert!(recvr.recv_timeout(timer).is_ok()); assert!(recvr.try_recv().is_err()); // Insert the rest of the ticks - ledger.insert_shreds(shreds).unwrap(); + ledger.insert_shreds(shreds, None).unwrap(); // Wait to get notified of update, should only be one update assert!(recvr.recv_timeout(timer).is_ok()); assert!(recvr.try_recv().is_err()); @@ -2313,7 +2297,7 @@ pub mod tests { } // Should be no updates, since no new chains from block 0 were formed - ledger.insert_shreds(shreds).unwrap(); + ledger.insert_shreds(shreds, None).unwrap(); assert!(recvr.recv_timeout(timer).is_err()); // Insert a shred for each slot that doesn't make a consecutive block, we @@ -2326,7 +2310,7 @@ pub mod tests { }) .collect(); - ledger.insert_shreds(shreds).unwrap(); + ledger.insert_shreds(shreds, None).unwrap(); assert!(recvr.recv_timeout(timer).is_err()); // For slots 1..num_slots/2, fill in the holes in one batch insertion, @@ -2334,13 +2318,13 @@ pub mod tests { let missing_shreds2 = missing_shreds .drain((num_slots / 2) as usize..) .collect_vec(); - ledger.insert_shreds(missing_shreds).unwrap(); + ledger.insert_shreds(missing_shreds, None).unwrap(); assert!(recvr.recv_timeout(timer).is_ok()); assert!(recvr.try_recv().is_err()); // Fill in the holes for each of the remaining slots, we should get a single update // for each - ledger.insert_shreds(missing_shreds2).unwrap(); + ledger.insert_shreds(missing_shreds2, None).unwrap(); // Destroying database without closing it first is undefined behavior drop(ledger); @@ -2361,11 +2345,11 @@ pub mod tests { let shred0 = shreds.remove(0); // Insert all but the first shred in the slot, should not be considered complete - ledger.insert_shreds(shreds).unwrap(); + ledger.insert_shreds(shreds, None).unwrap(); assert!(recvr.try_recv().is_err()); // Insert first shred, slot should now be considered complete - ledger.insert_shreds(vec![shred0]).unwrap(); + ledger.insert_shreds(vec![shred0], None).unwrap(); assert_eq!(recvr.try_recv().unwrap(), vec![0]); } @@ -2388,20 +2372,20 @@ pub mod tests { // Insert all but the first shred in the slot, should not be considered complete let orphan_child0 = orphan_child.remove(0); - ledger.insert_shreds(orphan_child).unwrap(); + ledger.insert_shreds(orphan_child, None).unwrap(); assert!(recvr.try_recv().is_err()); // Insert first shred, slot should now be considered complete - ledger.insert_shreds(vec![orphan_child0]).unwrap(); + ledger.insert_shreds(vec![orphan_child0], None).unwrap(); assert_eq!(recvr.try_recv().unwrap(), vec![slots[2]]); // Insert the shreds for the orphan_slot let orphan_shred0 = orphan_shreds.remove(0); - ledger.insert_shreds(orphan_shreds).unwrap(); + ledger.insert_shreds(orphan_shreds, None).unwrap(); assert!(recvr.try_recv().is_err()); // Insert first shred, slot should now be considered complete - ledger.insert_shreds(vec![orphan_shred0]).unwrap(); + ledger.insert_shreds(vec![orphan_shred0], None).unwrap(); assert_eq!(recvr.try_recv().unwrap(), vec![slots[1]]); } @@ -2428,7 +2412,7 @@ pub mod tests { .collect(); all_shreds.shuffle(&mut thread_rng()); - ledger.insert_shreds(all_shreds).unwrap(); + ledger.insert_shreds(all_shreds, None).unwrap(); let mut result = recvr.try_recv().unwrap(); result.sort(); slots.push(disconnected_slot); @@ -2452,7 +2436,7 @@ pub mod tests { let shreds1 = shreds .drain(shreds_per_slot..2 * shreds_per_slot) .collect_vec(); - blocktree.insert_shreds(shreds1).unwrap(); + blocktree.insert_shreds(shreds1, None).unwrap(); let s1 = blocktree.meta(1).unwrap().unwrap(); assert!(s1.next_slots.is_empty()); // Slot 1 is not trunk because slot 0 hasn't been inserted yet @@ -2464,7 +2448,7 @@ pub mod tests { let shreds2 = shreds .drain(shreds_per_slot..2 * shreds_per_slot) .collect_vec(); - blocktree.insert_shreds(shreds2).unwrap(); + blocktree.insert_shreds(shreds2, None).unwrap(); let s2 = blocktree.meta(2).unwrap().unwrap(); assert!(s2.next_slots.is_empty()); // Slot 2 is not trunk because slot 0 hasn't been inserted yet @@ -2482,7 +2466,7 @@ pub mod tests { // 3) Write to the zeroth slot, check that every slot // is now part of the trunk - blocktree.insert_shreds(shreds).unwrap(); + blocktree.insert_shreds(shreds, None).unwrap(); for i in 0..3 { let s = blocktree.meta(i).unwrap().unwrap(); // The last slot will not chain to any other slots @@ -2530,7 +2514,7 @@ pub mod tests { } // Write the shreds for every other slot - blocktree.insert_shreds(slots).unwrap(); + blocktree.insert_shreds(slots, None).unwrap(); // Check metadata for i in 0..num_slots { @@ -2556,7 +2540,7 @@ pub mod tests { } // Write the shreds for the other half of the slots that we didn't insert earlier - blocktree.insert_shreds(missing_slots).unwrap(); + blocktree.insert_shreds(missing_slots, None).unwrap(); for i in 0..num_slots { // Check that all the slots chain correctly once the missing slots @@ -2600,9 +2584,9 @@ pub mod tests { if slot % 3 == 0 { let shred0 = shreds_for_slot.remove(0); missing_shreds.push(shred0); - blocktree.insert_shreds(shreds_for_slot).unwrap(); + blocktree.insert_shreds(shreds_for_slot, None).unwrap(); } else { - blocktree.insert_shreds(shreds_for_slot).unwrap(); + blocktree.insert_shreds(shreds_for_slot, None).unwrap(); } } @@ -2637,7 +2621,7 @@ pub mod tests { for slot_index in 0..num_slots { if slot_index % 3 == 0 { let shred = missing_shreds.remove(0); - blocktree.insert_shreds(vec![shred]).unwrap(); + blocktree.insert_shreds(vec![shred], None).unwrap(); for i in 0..num_slots { let s = blocktree.meta(i as u64).unwrap().unwrap(); @@ -2818,7 +2802,7 @@ pub mod tests { // Write slot 2, which chains to slot 1. We're missing slot 0, // so slot 1 is the orphan let shreds_for_slot = shreds.drain((shreds_per_slot * 2)..).collect_vec(); - blocktree.insert_shreds(shreds_for_slot).unwrap(); + blocktree.insert_shreds(shreds_for_slot, None).unwrap(); let meta = blocktree .meta(1) .expect("Expect database get to succeed") @@ -2829,7 +2813,7 @@ pub mod tests { // Write slot 1 which chains to slot 0, so now slot 0 is the // orphan, and slot 1 is no longer the orphan. let shreds_for_slot = shreds.drain(shreds_per_slot..).collect_vec(); - blocktree.insert_shreds(shreds_for_slot).unwrap(); + blocktree.insert_shreds(shreds_for_slot, None).unwrap(); let meta = blocktree .meta(1) .expect("Expect database get to succeed") @@ -2846,12 +2830,12 @@ pub mod tests { // nothing should change let (shred4, _) = make_slot_entries(4, 0, 1); let (shred5, _) = make_slot_entries(5, 1, 1); - blocktree.insert_shreds(shred4).unwrap(); - blocktree.insert_shreds(shred5).unwrap(); + blocktree.insert_shreds(shred4, None).unwrap(); + blocktree.insert_shreds(shred5, None).unwrap(); assert_eq!(blocktree.get_orphans(None), vec![0]); // Write zeroth slot, no more orphans - blocktree.insert_shreds(shreds).unwrap(); + blocktree.insert_shreds(shreds, None).unwrap(); for i in 0..3 { let meta = blocktree .meta(i) @@ -2897,11 +2881,11 @@ pub mod tests { let num_shreds = shreds.len(); // Write shreds to the database if should_bulk_write { - blocktree.insert_shreds(shreds).unwrap(); + blocktree.insert_shreds(shreds, None).unwrap(); } else { for _ in 0..num_shreds { let shred = shreds.remove(0); - blocktree.insert_shreds(vec![shred]).unwrap(); + blocktree.insert_shreds(vec![shred], None).unwrap(); } } @@ -2943,7 +2927,7 @@ pub mod tests { b.set_index(i as u32 * gap as u32); b.set_slot(slot); } - blocktree.insert_shreds(shreds).unwrap(); + blocktree.insert_shreds(shreds, None).unwrap(); // Index of the first shred is 0 // Index of the second shred is "gap" @@ -3037,7 +3021,7 @@ pub mod tests { shreds[1].set_index(OTHER as u32); // Insert one shred at index = first_index - blocktree.insert_shreds(shreds).unwrap(); + blocktree.insert_shreds(shreds, None).unwrap(); const STARTS: u64 = OTHER * 2; const END: u64 = OTHER * 3; @@ -3071,7 +3055,7 @@ pub mod tests { let shreds = entries_to_test_shreds(entries, slot, 0, true); let num_shreds = shreds.len(); - blocktree.insert_shreds(shreds).unwrap(); + blocktree.insert_shreds(shreds, None).unwrap(); let empty: Vec = vec![]; for i in 0..num_shreds as u64 { @@ -3097,7 +3081,9 @@ pub mod tests { let last_root = RwLock::new(0); // Insert the first 5 shreds, we don't have a "is_last" shred yet - blocktree.insert_shreds(shreds[0..5].to_vec()).unwrap(); + blocktree + .insert_shreds(shreds[0..5].to_vec(), None) + .unwrap(); // Trying to insert a shred less than `slot_meta.consumed` should fail let slot_meta = blocktree.meta(0).unwrap().unwrap(); @@ -3112,7 +3098,9 @@ pub mod tests { // Trying to insert the same shred again should fail // skip over shred 5 so the `slot_meta.consumed` doesn't increment - blocktree.insert_shreds(shreds[6..7].to_vec()).unwrap(); + blocktree + .insert_shreds(shreds[6..7].to_vec(), None) + .unwrap(); let slot_meta = blocktree.meta(0).unwrap().unwrap(); let index = index_cf.get(0).unwrap().unwrap(); assert!(!Blocktree::should_insert_data_shred( @@ -3124,7 +3112,9 @@ pub mod tests { // Trying to insert another "is_last" shred with index < the received index should fail // skip over shred 7 - blocktree.insert_shreds(shreds[8..9].to_vec()).unwrap(); + blocktree + .insert_shreds(shreds[8..9].to_vec(), None) + .unwrap(); let slot_meta = blocktree.meta(0).unwrap().unwrap(); let index = index_cf.get(0).unwrap().unwrap(); assert_eq!(slot_meta.received, 9); @@ -3144,7 +3134,7 @@ pub mod tests { // Insert all pending shreds let mut shred8 = shreds[8].clone(); - blocktree.insert_shreds(shreds).unwrap(); + blocktree.insert_shreds(shreds, None).unwrap(); let slot_meta = blocktree.meta(0).unwrap().unwrap(); let index = index_cf.get(0).unwrap().unwrap(); @@ -3188,7 +3178,9 @@ pub mod tests { )); // Insertion should succeed - blocktree.insert_shreds(vec![coding_shred.clone()]).unwrap(); + blocktree + .insert_shreds(vec![coding_shred.clone()], None) + .unwrap(); // Trying to insert the same shred again should fail { @@ -3290,7 +3282,7 @@ pub mod tests { )); // Insertion should succeed - blocktree.insert_shreds(vec![coding_shred]).unwrap(); + blocktree.insert_shreds(vec![coding_shred], None).unwrap(); } // Trying to insert value into slot <= than last root should fail @@ -3319,7 +3311,7 @@ pub mod tests { let blocktree_path = get_tmp_ledger_path!(); let blocktree = Blocktree::open(&blocktree_path).unwrap(); - blocktree.insert_shreds(shreds).unwrap(); + blocktree.insert_shreds(shreds, None).unwrap(); let slot_meta = blocktree.meta(0).unwrap().unwrap(); assert_eq!(slot_meta.consumed, num_shreds); @@ -3328,7 +3320,7 @@ pub mod tests { assert!(slot_meta.is_full()); let (shreds, _) = make_slot_entries(0, 0, 22); - blocktree.insert_shreds(shreds).unwrap(); + blocktree.insert_shreds(shreds, None).unwrap(); let slot_meta = blocktree.meta(0).unwrap().unwrap(); assert_eq!(slot_meta.consumed, num_shreds); @@ -3350,7 +3342,7 @@ pub mod tests { let all_shreds = make_chaining_slot_entries(&slots, shreds_per_slot); let slot_8_shreds = bincode::serialize(&all_shreds[2].0).unwrap(); for (slot_shreds, _) in all_shreds { - blocktree.insert_shreds(slot_shreds).unwrap(); + blocktree.insert_shreds(slot_shreds, None).unwrap(); } // Slot doesnt exist, iterator should be empty @@ -3399,7 +3391,7 @@ pub mod tests { let blocktree = Blocktree::open(&blocktree_path).unwrap(); let (shreds, _) = make_many_slot_entries(0, 50, 6); let shreds_per_slot = shreds.len() as u64 / 50; - blocktree.insert_shreds(shreds).unwrap(); + blocktree.insert_shreds(shreds, None).unwrap(); blocktree .slot_meta_iterator(0) .unwrap() @@ -3434,7 +3426,7 @@ pub mod tests { let blocktree_path = get_tmp_ledger_path!(); let blocktree = Blocktree::open(&blocktree_path).unwrap(); let (shreds, _) = make_many_slot_entries(0, 50, 5); - blocktree.insert_shreds(shreds).unwrap(); + blocktree.insert_shreds(shreds, None).unwrap(); blocktree.purge_slots(0, Some(5)); @@ -3460,7 +3452,7 @@ pub mod tests { let blocktree_path = get_tmp_ledger_path!(); let blocktree = Blocktree::open(&blocktree_path).unwrap(); let (shreds, _) = make_many_slot_entries(0, 5000, 10); - blocktree.insert_shreds(shreds).unwrap(); + blocktree.insert_shreds(shreds, None).unwrap(); blocktree.purge_slots(0, Some(4999)); diff --git a/core/src/broadcast_stage/broadcast_fake_blobs_run.rs b/core/src/broadcast_stage/broadcast_fake_blobs_run.rs index 7490769252..83c0cf71a8 100644 --- a/core/src/broadcast_stage/broadcast_fake_blobs_run.rs +++ b/core/src/broadcast_stage/broadcast_fake_blobs_run.rs @@ -72,7 +72,7 @@ impl BroadcastRun for BroadcastFakeBlobsRun { self.last_blockhash = Hash::default(); } - blocktree.insert_shreds(shreds)?; + blocktree.insert_shreds(shreds, None)?; // 3) Start broadcast step let peers = cluster_info.read().unwrap().tvu_peers(); peers.iter().enumerate().for_each(|(i, peer)| { diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index 65420d87ff..5c6afe947c 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -53,7 +53,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { let seeds: Vec<[u8; 32]> = shreds.iter().map(|s| s.seed()).collect(); - blocktree.insert_shreds(shreds)?; + blocktree.insert_shreds(shreds, None)?; // 3) Start broadcast step let bank_epoch = bank.get_stakers_epoch(bank.slot()); diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index 9cbb4a4038..2b7387feb4 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -92,7 +92,7 @@ impl BroadcastRun for StandardBroadcastRun { let all_seeds: Vec<[u8; 32]> = all_shreds.iter().map(|s| s.seed()).collect(); let num_shreds = all_shreds.len(); blocktree - .insert_shreds(all_shreds) + .insert_shreds(all_shreds, None) .expect("Failed to insert shreds in blocktree"); let to_blobs_elapsed = to_blobs_start.elapsed(); diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 4c27c74aeb..95051cfa46 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1932,7 +1932,7 @@ mod tests { shred.set_index(1); blocktree - .insert_shreds(vec![shred]) + .insert_shreds(vec![shred], None) .expect("Expect successful ledger write"); let rv = ClusterInfo::run_window_request( @@ -2010,7 +2010,7 @@ mod tests { let (blobs, _) = make_many_slot_entries(1, 3, 5); blocktree - .insert_shreds(blobs) + .insert_shreds(blobs, None) .expect("Expect successful ledger write"); // We don't have slot 4, so we don't know how to service this requeset diff --git a/core/src/cluster_info_repair_listener.rs b/core/src/cluster_info_repair_listener.rs index cb3265cb11..2c4feed265 100644 --- a/core/src/cluster_info_repair_listener.rs +++ b/core/src/cluster_info_repair_listener.rs @@ -627,7 +627,7 @@ mod tests { let num_shreds_per_slot = shreds.len() as u64 / num_slots; // Write slots in the range [0, num_slots] to blocktree - blocktree.insert_shreds(shreds).unwrap(); + blocktree.insert_shreds(shreds, None).unwrap(); // Write roots so that these slots will qualify to be sent by the repairman let roots: Vec<_> = (0..=num_slots - 1).collect(); @@ -704,7 +704,7 @@ mod tests { // Create blobs for first two epochs and write them to blocktree let total_slots = slots_per_epoch * 2; let (shreds, _) = make_many_slot_entries(0, total_slots, 1); - blocktree.insert_shreds(shreds).unwrap(); + blocktree.insert_shreds(shreds, None).unwrap(); // Write roots so that these slots will qualify to be sent by the repairman let roots: Vec<_> = (0..=slots_per_epoch * 2 - 1).collect(); diff --git a/core/src/leader_schedule_cache.rs b/core/src/leader_schedule_cache.rs index a6bb144fab..007d5552b6 100644 --- a/core/src/leader_schedule_cache.rs +++ b/core/src/leader_schedule_cache.rs @@ -62,6 +62,8 @@ impl LeaderScheduleCache { pub fn slot_leader_at(&self, slot: u64, bank: Option<&Bank>) -> Option { if let Some(bank) = bank { self.slot_leader_at_else_compute(slot, bank) + } else if self.epoch_schedule.slots_per_epoch == 0 { + None } else { self.slot_leader_at_no_compute(slot) } @@ -400,7 +402,7 @@ mod tests { // Write a blob into slot 2 that chains to slot 1, // but slot 1 is empty so should not be skipped let (shreds, _) = make_slot_entries(2, 1, 1); - blocktree.insert_shreds(shreds).unwrap(); + blocktree.insert_shreds(shreds, None).unwrap(); assert_eq!( cache .next_leader_slot(&pubkey, 0, &bank, Some(&blocktree)) @@ -413,7 +415,7 @@ mod tests { let (shreds, _) = make_slot_entries(1, 0, 1); // Check that slot 1 and 2 are skipped - blocktree.insert_shreds(shreds).unwrap(); + blocktree.insert_shreds(shreds, None).unwrap(); assert_eq!( cache .next_leader_slot(&pubkey, 0, &bank, Some(&blocktree)) diff --git a/core/src/ledger_cleanup_service.rs b/core/src/ledger_cleanup_service.rs index 6d21bb5b74..ed1c8dec8d 100644 --- a/core/src/ledger_cleanup_service.rs +++ b/core/src/ledger_cleanup_service.rs @@ -84,7 +84,7 @@ mod tests { let blocktree_path = get_tmp_ledger_path!(); let blocktree = Blocktree::open(&blocktree_path).unwrap(); let (shreds, _) = make_many_slot_entries(0, 50, 5); - blocktree.insert_shreds(shreds).unwrap(); + blocktree.insert_shreds(shreds, None).unwrap(); let blocktree = Arc::new(blocktree); let (sender, receiver) = channel(); diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 1ca75f63df..c2adc17e1f 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -423,7 +423,7 @@ mod test { let (mut shreds, _) = make_slot_entries(1, 0, 1); let (shreds2, _) = make_slot_entries(5, 2, 1); shreds.extend(shreds2); - blocktree.insert_shreds(shreds).unwrap(); + blocktree.insert_shreds(shreds, None).unwrap(); assert_eq!( RepairService::generate_repairs(&blocktree, 0, 2).unwrap(), vec![RepairType::HighestBlob(0, 0), RepairType::Orphan(2)] @@ -443,7 +443,7 @@ mod test { // Write this blob to slot 2, should chain to slot 0, which we haven't received // any blobs for - blocktree.insert_shreds(shreds).unwrap(); + blocktree.insert_shreds(shreds, None).unwrap(); // Check that repair tries to patch the empty slot assert_eq!( @@ -479,7 +479,7 @@ mod test { missing_indexes_per_slot.insert(0, index); } } - blocktree.insert_shreds(shreds_to_write).unwrap(); + blocktree.insert_shreds(shreds_to_write, None).unwrap(); let expected: Vec = (0..num_slots) .flat_map(|slot| { @@ -517,7 +517,7 @@ mod test { // Remove last shred (which is also last in slot) so that slot is not complete shreds.pop(); - blocktree.insert_shreds(shreds).unwrap(); + blocktree.insert_shreds(shreds, None).unwrap(); // We didn't get the last blob for this slot, so ask for the highest blob for that slot let expected: Vec = @@ -543,7 +543,7 @@ mod test { let shreds = make_chaining_slot_entries(&slots, num_entries_per_slot); for (mut slot_shreds, _) in shreds.into_iter() { slot_shreds.remove(0); - blocktree.insert_shreds(slot_shreds).unwrap(); + blocktree.insert_shreds(slot_shreds, None).unwrap(); } // Iterate through all possible combinations of start..end (inclusive on both @@ -595,7 +595,7 @@ mod test { let parent = if i > 0 { i - 1 } else { 0 }; let (shreds, _) = make_slot_entries(i, parent, num_entries_per_slot as u64); - blocktree.insert_shreds(shreds).unwrap(); + blocktree.insert_shreds(shreds, None).unwrap(); } let end = 4; @@ -648,8 +648,10 @@ mod test { .collect(); let mut full_slots = BTreeSet::new(); - blocktree.insert_shreds(fork1_shreds).unwrap(); - blocktree.insert_shreds(fork2_incomplete_shreds).unwrap(); + blocktree.insert_shreds(fork1_shreds, None).unwrap(); + blocktree + .insert_shreds(fork2_incomplete_shreds, None) + .unwrap(); // Test that only slots > root from fork1 were included let epoch_schedule = EpochSchedule::new(32, 32, false); @@ -672,7 +674,7 @@ mod test { .into_iter() .flat_map(|(shreds, _)| shreds) .collect(); - blocktree.insert_shreds(fork3_shreds).unwrap(); + blocktree.insert_shreds(fork3_shreds, None).unwrap(); RepairService::get_completed_slots_past_root( &blocktree, &mut full_slots, @@ -719,7 +721,7 @@ mod test { let step = rng.gen_range(1, max_step + 1) as usize; let step = std::cmp::min(step, num_shreds - i); let shreds_to_insert = shreds.drain(..step).collect_vec(); - blocktree_.insert_shreds(shreds_to_insert).unwrap(); + blocktree_.insert_shreds(shreds_to_insert, None).unwrap(); sleep(Duration::from_millis(repair_interval_ms)); i += step; } @@ -749,7 +751,7 @@ mod test { // Update with new root, should filter out the slots <= root root = num_slots / 2; let (shreds, _) = make_slot_entries(num_slots + 2, num_slots + 1, entries_per_slot); - blocktree.insert_shreds(shreds).unwrap(); + blocktree.insert_shreds(shreds, None).unwrap(); RepairService::update_epoch_slots( Pubkey::default(), root, diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index db06c472a6..e8f132f223 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -897,7 +897,7 @@ mod test { // Insert blob for slot 1, generate new forks, check result let (shreds, _) = make_slot_entries(1, 0, 8); - blocktree.insert_shreds(shreds).unwrap(); + blocktree.insert_shreds(shreds, None).unwrap(); assert!(bank_forks.get(1).is_none()); ReplayStage::generate_new_bank_forks( &blocktree, @@ -908,7 +908,7 @@ mod test { // Insert blob for slot 3, generate new forks, check result let (shreds, _) = make_slot_entries(2, 0, 8); - blocktree.insert_shreds(shreds).unwrap(); + blocktree.insert_shreds(shreds, None).unwrap(); assert!(bank_forks.get(2).is_none()); ReplayStage::generate_new_bank_forks( &blocktree, @@ -1034,7 +1034,7 @@ mod test { let last_blockhash = bank0.last_blockhash(); progress.insert(bank0.slot(), ForkProgress::new(last_blockhash)); let shreds = shred_to_insert(&last_blockhash, bank0.slot()); - blocktree.insert_shreds(shreds).unwrap(); + blocktree.insert_shreds(shreds, None).unwrap(); let (res, _tx_count) = ReplayStage::replay_blocktree_into_bank(&bank0, &blocktree, &mut progress); diff --git a/core/src/replicator.rs b/core/src/replicator.rs index 1b85305c4c..669517e40f 100644 --- a/core/src/replicator.rs +++ b/core/src/replicator.rs @@ -4,6 +4,7 @@ use crate::chacha::{chacha_cbc_encrypt_ledger, CHACHA_BLOCK_SIZE}; use crate::cluster_info::{ClusterInfo, Node, FULLNODE_PORT_RANGE}; use crate::contact_info::ContactInfo; use crate::gossip_service::GossipService; +use crate::leader_schedule_cache::LeaderScheduleCache; use crate::packet::to_shared_blob; use crate::recycler::Recycler; use crate::repair_service; @@ -474,6 +475,7 @@ impl Replicator { repair_socket, &exit, RepairStrategy::RepairRange(repair_slot_range), + &Arc::new(LeaderScheduleCache::default()), |_, _, _, _| true, ); info!("waiting for ledger download"); @@ -873,7 +875,7 @@ impl Replicator { .iter() .filter_map(|p| bincode::deserialize(&p.data).ok()) .collect(); - blocktree.insert_shreds(shreds)?; + blocktree.insert_shreds(shreds, None)?; } // check if all the slots in the segment are complete if Self::segment_complete(start_slot, slots_per_segment, blocktree) { diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 8f9fde3c80..5dee03e18a 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -150,6 +150,7 @@ impl RetransmitStage { repair_socket, exit, repair_strategy, + &leader_schedule_cache.clone(), move |id, shred, shred_buf, working_bank| { should_retransmit_and_persist( shred, diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 4851b1a34a..5d19f631da 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -25,11 +25,6 @@ use std::time::{Duration, Instant}; pub const NUM_THREADS: u32 = 10; -/// Process a blob: Add blob to the ledger window. -pub fn process_shreds(shreds: Vec, blocktree: &Arc) -> Result<()> { - blocktree.insert_shreds(shreds) -} - /// drop blobs that are from myself or not from the correct leader for the /// blob's slot pub fn should_retransmit_and_persist( @@ -67,6 +62,7 @@ fn recv_window( retransmit: &PacketSender, shred_filter: F, thread_pool: &ThreadPool, + leader_schedule_cache: &Arc, ) -> Result<()> where F: Fn(&Shred, &[u8]) -> bool, @@ -117,7 +113,7 @@ where let _ = retransmit.send(packets); } - blocktree.insert_shreds(shreds)?; + blocktree.insert_shreds(shreds, Some(leader_schedule_cache))?; trace!( "Elapsed processing time in recv_window(): {}", @@ -160,6 +156,7 @@ impl WindowService { repair_socket: Arc, exit: &Arc, repair_strategy: RepairStrategy, + leader_schedule_cache: &Arc, shred_filter: F, ) -> WindowService where @@ -184,6 +181,7 @@ impl WindowService { let exit = exit.clone(); let shred_filter = Arc::new(shred_filter); let bank_forks = bank_forks.clone(); + let leader_schedule_cache = leader_schedule_cache.clone(); let t_window = Builder::new() .name("solana-window".to_string()) // TODO: Mark: Why is it overflowing @@ -218,6 +216,7 @@ impl WindowService { ) }, &thread_pool, + &leader_schedule_cache, ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, @@ -296,11 +295,12 @@ mod test { let blocktree = Arc::new(Blocktree::open(&blocktree_path).unwrap()); let num_entries = 10; let original_entries = make_tiny_test_entries(num_entries); - let shreds = local_entries_to_shred(original_entries.clone(), &Arc::new(Keypair::new())); - - for shred in shreds.into_iter().rev() { - process_shreds(vec![shred], &blocktree).expect("Expect successful processing of blob"); - } + let mut shreds = + local_entries_to_shred(original_entries.clone(), &Arc::new(Keypair::new())); + shreds.reverse(); + blocktree + .insert_shreds(shreds, None) + .expect("Expect successful processing of shred"); assert_eq!( blocktree.get_slot_entries(0, 0, None).unwrap(), @@ -411,6 +411,7 @@ mod test { Arc::new(leader_node.sockets.repair), &exit, repair_strategy, + &Arc::new(LeaderScheduleCache::default()), |_, _, _, _| true, ); let t_responder = { @@ -500,6 +501,7 @@ mod test { Arc::new(leader_node.sockets.repair), &exit, repair_strategy, + &Arc::new(LeaderScheduleCache::default()), |_, _, _, _| true, ); let t_responder = {