Weight repair slots based on vote stake (#10741) (#10746)

* Weight repair slots based on vote stake

* Add test

(cherry picked from commit cabd0a09c3)

Co-authored-by: sakridge <sakridge@gmail.com>
This commit is contained in:
mergify[bot]
2020-06-23 04:48:32 +00:00
committed by GitHub
parent 5f80c1d37d
commit 8865bfbd59
5 changed files with 147 additions and 27 deletions

View File

@ -49,7 +49,7 @@ pub type VerifiedVoteTransactionsReceiver = CrossbeamReceiver<Vec<Transaction>>;
pub struct SlotVoteTracker { pub struct SlotVoteTracker {
voted: HashSet<Arc<Pubkey>>, voted: HashSet<Arc<Pubkey>>,
updates: Option<Vec<Arc<Pubkey>>>, updates: Option<Vec<Arc<Pubkey>>>,
total_stake: u64, pub total_stake: u64,
} }
impl SlotVoteTracker { impl SlotVoteTracker {
@ -62,7 +62,7 @@ impl SlotVoteTracker {
#[derive(Default)] #[derive(Default)]
pub struct VoteTracker { pub struct VoteTracker {
// Map from a slot to a set of validators who have voted for that slot // Map from a slot to a set of validators who have voted for that slot
slot_vote_trackers: RwLock<HashMap<Slot, Arc<RwLock<SlotVoteTracker>>>>, pub slot_vote_trackers: RwLock<HashMap<Slot, Arc<RwLock<SlotVoteTracker>>>>,
// Don't track votes from people who are not staked, acts as a spam filter // Don't track votes from people who are not staked, acts as a spam filter
epoch_authorized_voters: RwLock<HashMap<Epoch, Arc<EpochAuthorizedVoters>>>, epoch_authorized_voters: RwLock<HashMap<Epoch, Arc<EpochAuthorizedVoters>>>,
leader_schedule_epoch: RwLock<Epoch>, leader_schedule_epoch: RwLock<Epoch>,

View File

@ -2,12 +2,16 @@
//! regularly finds missing shreds in the ledger and sends repair requests for those shreds //! regularly finds missing shreds in the ledger and sends repair requests for those shreds
use crate::{ use crate::{
cluster_info::ClusterInfo, cluster_info::ClusterInfo,
cluster_info_vote_listener::VoteTracker,
cluster_slots::ClusterSlots, cluster_slots::ClusterSlots,
commitment::VOTE_THRESHOLD_SIZE, commitment::VOTE_THRESHOLD_SIZE,
result::Result, result::Result,
serve_repair::{RepairType, ServeRepair, DEFAULT_NONCE}, serve_repair::{RepairType, ServeRepair, DEFAULT_NONCE},
}; };
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender}; use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use rand::distributions::{Distribution, WeightedIndex};
use rand::{thread_rng, Rng, SeedableRng};
use rand_chacha::ChaChaRng;
use solana_ledger::{ use solana_ledger::{
bank_forks::BankForks, bank_forks::BankForks,
blockstore::{Blockstore, CompletedSlotsReceiver, SlotMeta}, blockstore::{Blockstore, CompletedSlotsReceiver, SlotMeta},
@ -114,6 +118,7 @@ impl RepairService {
cluster_info: Arc<ClusterInfo>, cluster_info: Arc<ClusterInfo>,
repair_info: RepairInfo, repair_info: RepairInfo,
cluster_slots: Arc<ClusterSlots>, cluster_slots: Arc<ClusterSlots>,
vote_tracker: Arc<VoteTracker>,
) -> Self { ) -> Self {
let t_repair = Builder::new() let t_repair = Builder::new()
.name("solana-repair-service".to_string()) .name("solana-repair-service".to_string())
@ -125,6 +130,7 @@ impl RepairService {
cluster_info, cluster_info,
repair_info, repair_info,
&cluster_slots, &cluster_slots,
vote_tracker,
) )
}) })
.unwrap(); .unwrap();
@ -139,6 +145,7 @@ impl RepairService {
cluster_info: Arc<ClusterInfo>, cluster_info: Arc<ClusterInfo>,
repair_info: RepairInfo, repair_info: RepairInfo,
cluster_slots: &ClusterSlots, cluster_slots: &ClusterSlots,
vote_tracker: Arc<VoteTracker>,
) { ) {
let serve_repair = ServeRepair::new(cluster_info.clone()); let serve_repair = ServeRepair::new(cluster_info.clone());
let id = cluster_info.id(); let id = cluster_info.id();
@ -191,6 +198,7 @@ impl RepairService {
root_bank.slot(), root_bank.slot(),
MAX_REPAIR_LENGTH, MAX_REPAIR_LENGTH,
&duplicate_slot_repair_statuses, &duplicate_slot_repair_statuses,
&vote_tracker,
) )
}; };
@ -272,6 +280,7 @@ impl RepairService {
root: Slot, root: Slot,
max_repairs: usize, max_repairs: usize,
duplicate_slot_repair_statuses: &HashMap<Slot, DuplicateSlotRepairStatus>, duplicate_slot_repair_statuses: &HashMap<Slot, DuplicateSlotRepairStatus>,
vote_tracker: &Arc<VoteTracker>,
) -> Result<Vec<RepairType>> { ) -> Result<Vec<RepairType>> {
// Slot height and shred indexes for shreds we want to repair // Slot height and shred indexes for shreds we want to repair
let mut repairs: Vec<RepairType> = vec![]; let mut repairs: Vec<RepairType> = vec![];
@ -281,10 +290,9 @@ impl RepairService {
max_repairs, max_repairs,
root, root,
duplicate_slot_repair_statuses, duplicate_slot_repair_statuses,
vote_tracker,
); );
// TODO: Incorporate gossip to determine priorities for repair?
// Try to resolve orphans in blockstore // Try to resolve orphans in blockstore
let orphans = blockstore.orphans_iterator(root + 1).unwrap(); let orphans = blockstore.orphans_iterator(root + 1).unwrap();
Self::generate_repairs_for_orphans(orphans, &mut repairs); Self::generate_repairs_for_orphans(orphans, &mut repairs);
@ -533,27 +541,64 @@ impl RepairService {
max_repairs: usize, max_repairs: usize,
slot: Slot, slot: Slot,
duplicate_slot_repair_statuses: &HashMap<Slot, DuplicateSlotRepairStatus>, duplicate_slot_repair_statuses: &HashMap<Slot, DuplicateSlotRepairStatus>,
vote_tracker: &Arc<VoteTracker>,
) { ) {
let mut seed = [0u8; 32];
thread_rng().fill(&mut seed);
let rng = &mut ChaChaRng::from_seed(seed);
let mut pending_slots = vec![slot]; let mut pending_slots = vec![slot];
while repairs.len() < max_repairs && !pending_slots.is_empty() { while repairs.len() < max_repairs && !pending_slots.is_empty() {
let slot = pending_slots.pop().unwrap(); pending_slots.retain(|slot| !duplicate_slot_repair_statuses.contains_key(slot));
if duplicate_slot_repair_statuses.contains_key(&slot) { let mut next_pending_slots = vec![];
// These are repaired through a different path let mut level_repairs = HashMap::new();
continue; for slot in &pending_slots {
} if let Some(slot_meta) = blockstore.meta(*slot).unwrap() {
if let Some(slot_meta) = blockstore.meta(slot).unwrap() {
let new_repairs = Self::generate_repairs_for_slot( let new_repairs = Self::generate_repairs_for_slot(
blockstore, blockstore,
slot, *slot,
&slot_meta, &slot_meta,
max_repairs - repairs.len(), std::usize::MAX,
); );
repairs.extend(new_repairs); if !new_repairs.is_empty() {
let next_slots = slot_meta.next_slots; level_repairs.insert(*slot, new_repairs);
pending_slots.extend(next_slots);
} else {
break;
} }
next_pending_slots.extend(slot_meta.next_slots);
}
}
if !level_repairs.is_empty() {
let mut slots_to_repair: Vec<_> = level_repairs.keys().cloned().collect();
let mut weights: Vec<_> = {
let r_vote_tracker = vote_tracker.slot_vote_trackers.read().unwrap();
slots_to_repair
.iter()
.map(|slot| {
if let Some(slot_vote_tracker) = r_vote_tracker.get(slot) {
std::cmp::max(slot_vote_tracker.read().unwrap().total_stake, 1)
} else {
// should it be something else?
1
}
})
.collect()
};
let mut weighted_index = WeightedIndex::new(weights.clone()).unwrap();
while repairs.len() < max_repairs && !level_repairs.is_empty() {
let index = weighted_index.sample(rng);
let slot_repairs = level_repairs.get_mut(&slots_to_repair[index]).unwrap();
repairs.push(slot_repairs.remove(0));
if slot_repairs.is_empty() {
level_repairs.remove(&slots_to_repair[index]);
slots_to_repair.remove(index);
weights.remove(index);
if !weights.is_empty() {
weighted_index = WeightedIndex::new(weights.clone()).unwrap();
}
}
}
}
pending_slots = next_pending_slots;
} }
} }
@ -640,8 +685,10 @@ mod test {
let (shreds2, _) = make_slot_entries(5, 2, 1); let (shreds2, _) = make_slot_entries(5, 2, 1);
shreds.extend(shreds2); shreds.extend(shreds2);
blockstore.insert_shreds(shreds, None, false).unwrap(); blockstore.insert_shreds(shreds, None, false).unwrap();
let vote_tracker = Arc::new(VoteTracker::default());
assert_eq!( assert_eq!(
RepairService::generate_repairs(&blockstore, 0, 2, &HashMap::new()).unwrap(), RepairService::generate_repairs(&blockstore, 0, 2, &HashMap::new(), &vote_tracker)
.unwrap(),
vec![RepairType::HighestShred(0, 0), RepairType::Orphan(2)] vec![RepairType::HighestShred(0, 0), RepairType::Orphan(2)]
); );
} }
@ -661,9 +708,11 @@ mod test {
// any shreds for // any shreds for
blockstore.insert_shreds(shreds, None, false).unwrap(); blockstore.insert_shreds(shreds, None, false).unwrap();
let vote_tracker = Arc::new(VoteTracker::default());
// Check that repair tries to patch the empty slot // Check that repair tries to patch the empty slot
assert_eq!( assert_eq!(
RepairService::generate_repairs(&blockstore, 0, 2, &HashMap::new()).unwrap(), RepairService::generate_repairs(&blockstore, 0, 2, &HashMap::new(), &vote_tracker)
.unwrap(),
vec![RepairType::HighestShred(0, 0)] vec![RepairType::HighestShred(0, 0)]
); );
} }
@ -708,8 +757,15 @@ mod test {
}) })
.collect(); .collect();
let vote_tracker = Arc::new(VoteTracker::default());
assert_eq!( assert_eq!(
RepairService::generate_repairs(&blockstore, 0, std::usize::MAX, &HashMap::new()) RepairService::generate_repairs(
&blockstore,
0,
std::usize::MAX,
&HashMap::new(),
&vote_tracker
)
.unwrap(), .unwrap(),
expected expected
); );
@ -719,7 +775,8 @@ mod test {
&blockstore, &blockstore,
0, 0,
expected.len() - 2, expected.len() - 2,
&HashMap::new() &HashMap::new(),
&vote_tracker,
) )
.unwrap()[..], .unwrap()[..],
expected[0..expected.len() - 2] expected[0..expected.len() - 2]
@ -728,6 +785,55 @@ mod test {
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
} }
#[test]
pub fn test_repairs_distributed_across_slots() {
solana_logger::setup();
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let num_entries_per_slot = 100;
// Create some shreds
for i in 1..10 {
let (shreds, _) = make_slot_entries(i, 0, num_entries_per_slot as u64);
// Only insert the first shred
blockstore
.insert_shreds(shreds[..1].to_vec(), None, false)
.unwrap();
}
let vote_tracker = Arc::new(VoteTracker::default());
let repairs = RepairService::generate_repairs(
&blockstore,
0,
num_entries_per_slot,
&HashMap::new(),
&vote_tracker,
)
.unwrap();
let mut repairs_slots = HashMap::new();
for repair in repairs {
match repair {
RepairType::Shred(slot, _shred_index) => {
*repairs_slots.entry(slot).or_insert(0) += 1;
}
RepairType::HighestShred(slot, _shred_index) => {
*repairs_slots.entry(slot).or_insert(0) += 1;
}
RepairType::Orphan(slot) => {
*repairs_slots.entry(slot).or_insert(0) += 1;
}
}
}
for i in 1..10 {
assert!(repairs_slots.contains_key(&i));
}
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test] #[test]
pub fn test_generate_highest_repair() { pub fn test_generate_highest_repair() {
let blockstore_path = get_tmp_ledger_path!(); let blockstore_path = get_tmp_ledger_path!();
@ -749,8 +855,15 @@ mod test {
let expected: Vec<RepairType> = let expected: Vec<RepairType> =
vec![RepairType::HighestShred(0, num_shreds_per_slot - 1)]; vec![RepairType::HighestShred(0, num_shreds_per_slot - 1)];
let vote_tracker = Arc::new(VoteTracker::default());
assert_eq!( assert_eq!(
RepairService::generate_repairs(&blockstore, 0, std::usize::MAX, &HashMap::new()) RepairService::generate_repairs(
&blockstore,
0,
std::usize::MAX,
&HashMap::new(),
&vote_tracker
)
.unwrap(), .unwrap(),
expected expected
); );

View File

@ -1,5 +1,6 @@
//! The `retransmit_stage` retransmits shreds between validators //! The `retransmit_stage` retransmits shreds between validators
use crate::cluster_info_vote_listener::VoteTracker;
use crate::{ use crate::{
cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT}, cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT},
cluster_slots::ClusterSlots, cluster_slots::ClusterSlots,
@ -413,6 +414,7 @@ impl RetransmitStage {
shred_version: u16, shred_version: u16,
cluster_slots: Arc<ClusterSlots>, cluster_slots: Arc<ClusterSlots>,
duplicate_slots_reset_sender: DuplicateSlotsResetSender, duplicate_slots_reset_sender: DuplicateSlotsResetSender,
vote_tracker: Arc<VoteTracker>,
) -> Self { ) -> Self {
let (retransmit_sender, retransmit_receiver) = channel(); let (retransmit_sender, retransmit_receiver) = channel();
@ -457,6 +459,7 @@ impl RetransmitStage {
rv && is_connected rv && is_connected
}, },
cluster_slots, cluster_slots,
vote_tracker,
); );
let thread_hdls = t_retransmit; let thread_hdls = t_retransmit;

View File

@ -146,6 +146,7 @@ impl Tvu {
tvu_config.shred_version, tvu_config.shred_version,
cluster_slots.clone(), cluster_slots.clone(),
duplicate_slots_reset_sender, duplicate_slots_reset_sender,
vote_tracker.clone(),
); );
let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel(); let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel();

View File

@ -3,6 +3,7 @@
//! //!
use crate::{ use crate::{
cluster_info::ClusterInfo, cluster_info::ClusterInfo,
cluster_info_vote_listener::VoteTracker,
cluster_slots::ClusterSlots, cluster_slots::ClusterSlots,
repair_response, repair_response,
repair_service::{RepairInfo, RepairService}, repair_service::{RepairInfo, RepairService},
@ -301,6 +302,7 @@ impl WindowService {
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: &Arc<LeaderScheduleCache>,
shred_filter: F, shred_filter: F,
cluster_slots: Arc<ClusterSlots>, cluster_slots: Arc<ClusterSlots>,
vote_tracker: Arc<VoteTracker>,
) -> WindowService ) -> WindowService
where where
F: 'static F: 'static
@ -317,6 +319,7 @@ impl WindowService {
cluster_info.clone(), cluster_info.clone(),
repair_info, repair_info,
cluster_slots, cluster_slots,
vote_tracker,
); );
let (insert_sender, insert_receiver) = unbounded(); let (insert_sender, insert_receiver) = unbounded();