Weight repair slots based on vote stake (#10741)
* Weight repair slots based on vote stake * Add test
This commit is contained in:
		| @@ -49,7 +49,7 @@ pub type VerifiedVoteTransactionsReceiver = CrossbeamReceiver<Vec<Transaction>>; | ||||
| pub struct SlotVoteTracker { | ||||
|     voted: HashSet<Arc<Pubkey>>, | ||||
|     updates: Option<Vec<Arc<Pubkey>>>, | ||||
|     total_stake: u64, | ||||
|     pub total_stake: u64, | ||||
| } | ||||
|  | ||||
| impl SlotVoteTracker { | ||||
| @@ -62,7 +62,7 @@ impl SlotVoteTracker { | ||||
| #[derive(Default)] | ||||
| pub struct VoteTracker { | ||||
|     // Map from a slot to a set of validators who have voted for that slot | ||||
|     slot_vote_trackers: RwLock<HashMap<Slot, Arc<RwLock<SlotVoteTracker>>>>, | ||||
|     pub slot_vote_trackers: RwLock<HashMap<Slot, Arc<RwLock<SlotVoteTracker>>>>, | ||||
|     // Don't track votes from people who are not staked, acts as a spam filter | ||||
|     epoch_authorized_voters: RwLock<HashMap<Epoch, Arc<EpochAuthorizedVoters>>>, | ||||
|     leader_schedule_epoch: RwLock<Epoch>, | ||||
|   | ||||
| @@ -2,12 +2,16 @@ | ||||
| //! regularly finds missing shreds in the ledger and sends repair requests for those shreds | ||||
| use crate::{ | ||||
|     cluster_info::ClusterInfo, | ||||
|     cluster_info_vote_listener::VoteTracker, | ||||
|     cluster_slots::ClusterSlots, | ||||
|     commitment::VOTE_THRESHOLD_SIZE, | ||||
|     result::Result, | ||||
|     serve_repair::{RepairType, ServeRepair, DEFAULT_NONCE}, | ||||
| }; | ||||
| use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender}; | ||||
| use rand::distributions::{Distribution, WeightedIndex}; | ||||
| use rand::{thread_rng, Rng, SeedableRng}; | ||||
| use rand_chacha::ChaChaRng; | ||||
| use solana_ledger::{ | ||||
|     blockstore::{Blockstore, CompletedSlotsReceiver, SlotMeta}, | ||||
|     shred::Nonce, | ||||
| @@ -113,6 +117,7 @@ impl RepairService { | ||||
|         cluster_info: Arc<ClusterInfo>, | ||||
|         repair_info: RepairInfo, | ||||
|         cluster_slots: Arc<ClusterSlots>, | ||||
|         vote_tracker: Arc<VoteTracker>, | ||||
|     ) -> Self { | ||||
|         let t_repair = Builder::new() | ||||
|             .name("solana-repair-service".to_string()) | ||||
| @@ -124,6 +129,7 @@ impl RepairService { | ||||
|                     cluster_info, | ||||
|                     repair_info, | ||||
|                     &cluster_slots, | ||||
|                     vote_tracker, | ||||
|                 ) | ||||
|             }) | ||||
|             .unwrap(); | ||||
| @@ -138,6 +144,7 @@ impl RepairService { | ||||
|         cluster_info: Arc<ClusterInfo>, | ||||
|         repair_info: RepairInfo, | ||||
|         cluster_slots: &ClusterSlots, | ||||
|         vote_tracker: Arc<VoteTracker>, | ||||
|     ) { | ||||
|         let serve_repair = ServeRepair::new(cluster_info.clone()); | ||||
|         let id = cluster_info.id(); | ||||
| @@ -190,6 +197,7 @@ impl RepairService { | ||||
|                     root_bank.slot(), | ||||
|                     MAX_REPAIR_LENGTH, | ||||
|                     &duplicate_slot_repair_statuses, | ||||
|                     &vote_tracker, | ||||
|                 ) | ||||
|             }; | ||||
|  | ||||
| @@ -271,6 +279,7 @@ impl RepairService { | ||||
|         root: Slot, | ||||
|         max_repairs: usize, | ||||
|         duplicate_slot_repair_statuses: &HashMap<Slot, DuplicateSlotRepairStatus>, | ||||
|         vote_tracker: &Arc<VoteTracker>, | ||||
|     ) -> Result<Vec<RepairType>> { | ||||
|         // Slot height and shred indexes for shreds we want to repair | ||||
|         let mut repairs: Vec<RepairType> = vec![]; | ||||
| @@ -280,10 +289,9 @@ impl RepairService { | ||||
|             max_repairs, | ||||
|             root, | ||||
|             duplicate_slot_repair_statuses, | ||||
|             vote_tracker, | ||||
|         ); | ||||
|  | ||||
|         // TODO: Incorporate gossip to determine priorities for repair? | ||||
|  | ||||
|         // Try to resolve orphans in blockstore | ||||
|         let orphans = blockstore.orphans_iterator(root + 1).unwrap(); | ||||
|         Self::generate_repairs_for_orphans(orphans, &mut repairs); | ||||
| @@ -526,27 +534,64 @@ impl RepairService { | ||||
|         max_repairs: usize, | ||||
|         slot: Slot, | ||||
|         duplicate_slot_repair_statuses: &HashMap<Slot, DuplicateSlotRepairStatus>, | ||||
|         vote_tracker: &Arc<VoteTracker>, | ||||
|     ) { | ||||
|         let mut seed = [0u8; 32]; | ||||
|         thread_rng().fill(&mut seed); | ||||
|         let rng = &mut ChaChaRng::from_seed(seed); | ||||
|         let mut pending_slots = vec![slot]; | ||||
|         while repairs.len() < max_repairs && !pending_slots.is_empty() { | ||||
|             let slot = pending_slots.pop().unwrap(); | ||||
|             if duplicate_slot_repair_statuses.contains_key(&slot) { | ||||
|                 // These are repaired through a different path | ||||
|                 continue; | ||||
|             pending_slots.retain(|slot| !duplicate_slot_repair_statuses.contains_key(slot)); | ||||
|             let mut next_pending_slots = vec![]; | ||||
|             let mut level_repairs = HashMap::new(); | ||||
|             for slot in &pending_slots { | ||||
|                 if let Some(slot_meta) = blockstore.meta(*slot).unwrap() { | ||||
|                     let new_repairs = Self::generate_repairs_for_slot( | ||||
|                         blockstore, | ||||
|                         *slot, | ||||
|                         &slot_meta, | ||||
|                         std::usize::MAX, | ||||
|                     ); | ||||
|                     if !new_repairs.is_empty() { | ||||
|                         level_repairs.insert(*slot, new_repairs); | ||||
|                     } | ||||
|                     next_pending_slots.extend(slot_meta.next_slots); | ||||
|                 } | ||||
|             } | ||||
|             if let Some(slot_meta) = blockstore.meta(slot).unwrap() { | ||||
|                 let new_repairs = Self::generate_repairs_for_slot( | ||||
|                     blockstore, | ||||
|                     slot, | ||||
|                     &slot_meta, | ||||
|                     max_repairs - repairs.len(), | ||||
|                 ); | ||||
|                 repairs.extend(new_repairs); | ||||
|                 let next_slots = slot_meta.next_slots; | ||||
|                 pending_slots.extend(next_slots); | ||||
|             } else { | ||||
|                 break; | ||||
|  | ||||
|             if !level_repairs.is_empty() { | ||||
|                 let mut slots_to_repair: Vec<_> = level_repairs.keys().cloned().collect(); | ||||
|                 let mut weights: Vec<_> = { | ||||
|                     let r_vote_tracker = vote_tracker.slot_vote_trackers.read().unwrap(); | ||||
|                     slots_to_repair | ||||
|                         .iter() | ||||
|                         .map(|slot| { | ||||
|                             if let Some(slot_vote_tracker) = r_vote_tracker.get(slot) { | ||||
|                                 std::cmp::max(slot_vote_tracker.read().unwrap().total_stake, 1) | ||||
|                             } else { | ||||
|                                 // should it be something else? | ||||
|                                 1 | ||||
|                             } | ||||
|                         }) | ||||
|                         .collect() | ||||
|                 }; | ||||
|  | ||||
|                 let mut weighted_index = WeightedIndex::new(weights.clone()).unwrap(); | ||||
|                 while repairs.len() < max_repairs && !level_repairs.is_empty() { | ||||
|                     let index = weighted_index.sample(rng); | ||||
|                     let slot_repairs = level_repairs.get_mut(&slots_to_repair[index]).unwrap(); | ||||
|                     repairs.push(slot_repairs.remove(0)); | ||||
|                     if slot_repairs.is_empty() { | ||||
|                         level_repairs.remove(&slots_to_repair[index]); | ||||
|                         slots_to_repair.remove(index); | ||||
|                         weights.remove(index); | ||||
|                         if !weights.is_empty() { | ||||
|                             weighted_index = WeightedIndex::new(weights.clone()).unwrap(); | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|             pending_slots = next_pending_slots; | ||||
|         } | ||||
|     } | ||||
|  | ||||
| @@ -633,8 +678,10 @@ mod test { | ||||
|             let (shreds2, _) = make_slot_entries(5, 2, 1); | ||||
|             shreds.extend(shreds2); | ||||
|             blockstore.insert_shreds(shreds, None, false).unwrap(); | ||||
|             let vote_tracker = Arc::new(VoteTracker::default()); | ||||
|             assert_eq!( | ||||
|                 RepairService::generate_repairs(&blockstore, 0, 2, &HashMap::new()).unwrap(), | ||||
|                 RepairService::generate_repairs(&blockstore, 0, 2, &HashMap::new(), &vote_tracker) | ||||
|                     .unwrap(), | ||||
|                 vec![RepairType::HighestShred(0, 0), RepairType::Orphan(2)] | ||||
|             ); | ||||
|         } | ||||
| @@ -654,9 +701,11 @@ mod test { | ||||
|             // any shreds for | ||||
|             blockstore.insert_shreds(shreds, None, false).unwrap(); | ||||
|  | ||||
|             let vote_tracker = Arc::new(VoteTracker::default()); | ||||
|             // Check that repair tries to patch the empty slot | ||||
|             assert_eq!( | ||||
|                 RepairService::generate_repairs(&blockstore, 0, 2, &HashMap::new()).unwrap(), | ||||
|                 RepairService::generate_repairs(&blockstore, 0, 2, &HashMap::new(), &vote_tracker) | ||||
|                     .unwrap(), | ||||
|                 vec![RepairType::HighestShred(0, 0)] | ||||
|             ); | ||||
|         } | ||||
| @@ -701,9 +750,16 @@ mod test { | ||||
|                 }) | ||||
|                 .collect(); | ||||
|  | ||||
|             let vote_tracker = Arc::new(VoteTracker::default()); | ||||
|             assert_eq!( | ||||
|                 RepairService::generate_repairs(&blockstore, 0, std::usize::MAX, &HashMap::new()) | ||||
|                     .unwrap(), | ||||
|                 RepairService::generate_repairs( | ||||
|                     &blockstore, | ||||
|                     0, | ||||
|                     std::usize::MAX, | ||||
|                     &HashMap::new(), | ||||
|                     &vote_tracker | ||||
|                 ) | ||||
|                 .unwrap(), | ||||
|                 expected | ||||
|             ); | ||||
|  | ||||
| @@ -712,7 +768,8 @@ mod test { | ||||
|                     &blockstore, | ||||
|                     0, | ||||
|                     expected.len() - 2, | ||||
|                     &HashMap::new() | ||||
|                     &HashMap::new(), | ||||
|                     &vote_tracker, | ||||
|                 ) | ||||
|                 .unwrap()[..], | ||||
|                 expected[0..expected.len() - 2] | ||||
| @@ -721,6 +778,55 @@ mod test { | ||||
|         Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     pub fn test_repairs_distributed_across_slots() { | ||||
|         solana_logger::setup(); | ||||
|         let blockstore_path = get_tmp_ledger_path!(); | ||||
|         { | ||||
|             let blockstore = Blockstore::open(&blockstore_path).unwrap(); | ||||
|  | ||||
|             let num_entries_per_slot = 100; | ||||
|  | ||||
|             // Create some shreds | ||||
|             for i in 1..10 { | ||||
|                 let (shreds, _) = make_slot_entries(i, 0, num_entries_per_slot as u64); | ||||
|  | ||||
|                 // Only insert the first shred | ||||
|                 blockstore | ||||
|                     .insert_shreds(shreds[..1].to_vec(), None, false) | ||||
|                     .unwrap(); | ||||
|             } | ||||
|  | ||||
|             let vote_tracker = Arc::new(VoteTracker::default()); | ||||
|             let repairs = RepairService::generate_repairs( | ||||
|                 &blockstore, | ||||
|                 0, | ||||
|                 num_entries_per_slot, | ||||
|                 &HashMap::new(), | ||||
|                 &vote_tracker, | ||||
|             ) | ||||
|             .unwrap(); | ||||
|             let mut repairs_slots = HashMap::new(); | ||||
|             for repair in repairs { | ||||
|                 match repair { | ||||
|                     RepairType::Shred(slot, _shred_index) => { | ||||
|                         *repairs_slots.entry(slot).or_insert(0) += 1; | ||||
|                     } | ||||
|                     RepairType::HighestShred(slot, _shred_index) => { | ||||
|                         *repairs_slots.entry(slot).or_insert(0) += 1; | ||||
|                     } | ||||
|                     RepairType::Orphan(slot) => { | ||||
|                         *repairs_slots.entry(slot).or_insert(0) += 1; | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|             for i in 1..10 { | ||||
|                 assert!(repairs_slots.contains_key(&i)); | ||||
|             } | ||||
|         } | ||||
|         Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     pub fn test_generate_highest_repair() { | ||||
|         let blockstore_path = get_tmp_ledger_path!(); | ||||
| @@ -742,9 +848,16 @@ mod test { | ||||
|             let expected: Vec<RepairType> = | ||||
|                 vec![RepairType::HighestShred(0, num_shreds_per_slot - 1)]; | ||||
|  | ||||
|             let vote_tracker = Arc::new(VoteTracker::default()); | ||||
|             assert_eq!( | ||||
|                 RepairService::generate_repairs(&blockstore, 0, std::usize::MAX, &HashMap::new()) | ||||
|                     .unwrap(), | ||||
|                 RepairService::generate_repairs( | ||||
|                     &blockstore, | ||||
|                     0, | ||||
|                     std::usize::MAX, | ||||
|                     &HashMap::new(), | ||||
|                     &vote_tracker | ||||
|                 ) | ||||
|                 .unwrap(), | ||||
|                 expected | ||||
|             ); | ||||
|         } | ||||
|   | ||||
| @@ -1,5 +1,6 @@ | ||||
| //! The `retransmit_stage` retransmits shreds between validators | ||||
|  | ||||
| use crate::cluster_info_vote_listener::VoteTracker; | ||||
| use crate::{ | ||||
|     cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT}, | ||||
|     cluster_slots::ClusterSlots, | ||||
| @@ -413,6 +414,7 @@ impl RetransmitStage { | ||||
|         shred_version: u16, | ||||
|         cluster_slots: Arc<ClusterSlots>, | ||||
|         duplicate_slots_reset_sender: DuplicateSlotsResetSender, | ||||
|         vote_tracker: Arc<VoteTracker>, | ||||
|     ) -> Self { | ||||
|         let (retransmit_sender, retransmit_receiver) = channel(); | ||||
|  | ||||
| @@ -457,6 +459,7 @@ impl RetransmitStage { | ||||
|                 rv && is_connected | ||||
|             }, | ||||
|             cluster_slots, | ||||
|             vote_tracker, | ||||
|         ); | ||||
|  | ||||
|         let thread_hdls = t_retransmit; | ||||
|   | ||||
| @@ -145,6 +145,7 @@ impl Tvu { | ||||
|             tvu_config.shred_version, | ||||
|             cluster_slots.clone(), | ||||
|             duplicate_slots_reset_sender, | ||||
|             vote_tracker.clone(), | ||||
|         ); | ||||
|  | ||||
|         let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel(); | ||||
|   | ||||
| @@ -3,6 +3,7 @@ | ||||
| //! | ||||
| use crate::{ | ||||
|     cluster_info::ClusterInfo, | ||||
|     cluster_info_vote_listener::VoteTracker, | ||||
|     cluster_slots::ClusterSlots, | ||||
|     repair_response, | ||||
|     repair_service::{RepairInfo, RepairService}, | ||||
| @@ -300,6 +301,7 @@ impl WindowService { | ||||
|         leader_schedule_cache: &Arc<LeaderScheduleCache>, | ||||
|         shred_filter: F, | ||||
|         cluster_slots: Arc<ClusterSlots>, | ||||
|         vote_tracker: Arc<VoteTracker>, | ||||
|     ) -> WindowService | ||||
|     where | ||||
|         F: 'static | ||||
| @@ -316,6 +318,7 @@ impl WindowService { | ||||
|             cluster_info.clone(), | ||||
|             repair_info, | ||||
|             cluster_slots, | ||||
|             vote_tracker, | ||||
|         ); | ||||
|  | ||||
|         let (insert_sender, insert_receiver) = unbounded(); | ||||
|   | ||||
		Reference in New Issue
	
	Block a user