Switch to using weighted repair in RepairService (#10735)

* Plumb votes into repair service

* Remove refactoring

* Fix tests

* Switch to using RepairWeight for generating repairs

* Revert "Weight repair slots based on vote stake (#10741)"

This reverts commit cabd0a09c3.

* Update logging

Co-authored-by: Carl <carl@solana.com>
This commit is contained in:
carllin
2020-07-09 23:52:54 -06:00
committed by GitHub
parent f1c1152948
commit 7a14e359d7
11 changed files with 820 additions and 388 deletions

View File

@ -2,16 +2,14 @@
//! regularly finds missing shreds in the ledger and sends repair requests for those shreds
use crate::{
cluster_info::ClusterInfo,
cluster_info_vote_listener::VoteTracker,
cluster_info_vote_listener::VerifiedVoteReceiver,
cluster_slots::ClusterSlots,
repair_weight::RepairWeight,
repair_weighted_traversal::Contains,
result::Result,
serve_repair::{RepairType, ServeRepair, DEFAULT_NONCE},
};
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use rand::distributions::{Distribution, WeightedIndex};
use rand::{thread_rng, Rng, SeedableRng};
use rand_chacha::ChaChaRng;
use solana_ledger::{
blockstore::{Blockstore, CompletedSlotsReceiver, SlotMeta},
shred::Nonce,
@ -77,23 +75,31 @@ pub struct RepairStats {
#[derive(Default, Debug)]
pub struct RepairTiming {
pub set_root_elapsed: u64,
pub get_votes_elapsed: u64,
pub add_votes_elapsed: u64,
pub lowest_slot_elapsed: u64,
pub update_completed_slots_elapsed: u64,
pub generate_repairs_elapsed: u64,
pub get_best_orphans_elapsed: u64,
pub get_best_shreds_elapsed: u64,
pub send_repairs_elapsed: u64,
}
impl RepairTiming {
fn update(
&mut self,
set_root_elapsed: u64,
get_votes_elapsed: u64,
add_votes_elapsed: u64,
lowest_slot_elapsed: u64,
update_completed_slots_elapsed: u64,
generate_repairs_elapsed: u64,
send_repairs_elapsed: u64,
) {
self.set_root_elapsed += set_root_elapsed;
self.get_votes_elapsed += get_votes_elapsed;
self.add_votes_elapsed += add_votes_elapsed;
self.lowest_slot_elapsed += lowest_slot_elapsed;
self.update_completed_slots_elapsed += update_completed_slots_elapsed;
self.generate_repairs_elapsed += generate_repairs_elapsed;
self.send_repairs_elapsed += send_repairs_elapsed;
}
}
@ -143,7 +149,7 @@ impl RepairService {
cluster_info: Arc<ClusterInfo>,
repair_info: RepairInfo,
cluster_slots: Arc<ClusterSlots>,
vote_tracker: Arc<VoteTracker>,
verified_vote_receiver: VerifiedVoteReceiver,
) -> Self {
let t_repair = Builder::new()
.name("solana-repair-service".to_string())
@ -155,7 +161,7 @@ impl RepairService {
cluster_info,
repair_info,
&cluster_slots,
vote_tracker,
verified_vote_receiver,
)
})
.unwrap();
@ -170,15 +176,17 @@ impl RepairService {
cluster_info: Arc<ClusterInfo>,
repair_info: RepairInfo,
cluster_slots: &ClusterSlots,
vote_tracker: Arc<VoteTracker>,
verified_vote_receiver: VerifiedVoteReceiver,
) {
let mut repair_weight = RepairWeight::new(repair_info.bank_forks.read().unwrap().root());
let serve_repair = ServeRepair::new(cluster_info.clone());
let id = cluster_info.id();
Self::initialize_lowest_slot(id, blockstore, &cluster_info);
let mut repair_stats = RepairStats::default();
let mut repair_timing = RepairTiming::default();
let mut last_stats = Instant::now();
let duplicate_slot_repair_statuses = HashMap::new();
let duplicate_slot_repair_statuses: HashMap<Slot, DuplicateSlotRepairStatus> =
HashMap::new();
Self::initialize_epoch_slots(
blockstore,
@ -190,12 +198,44 @@ impl RepairService {
break;
}
let mut set_root_elapsed;
let mut get_votes_elapsed;
let mut add_votes_elapsed;
let mut lowest_slot_elapsed;
let mut update_completed_slots_elapsed;
let mut generate_repairs_elapsed;
let repairs = {
let root_bank = repair_info.bank_forks.read().unwrap().root_bank().clone();
let new_root = root_bank.slot();
// Purge outdated slots from the weighting heuristic
set_root_elapsed = Measure::start("set_root_elapsed");
repair_weight.set_root(new_root);
set_root_elapsed.stop();
// Add new votes to the weighting heuristic
get_votes_elapsed = Measure::start("get_votes_elapsed");
let mut slot_to_vote_pubkeys: HashMap<Slot, Vec<Pubkey>> = HashMap::new();
verified_vote_receiver
.try_iter()
.for_each(|(vote_pubkey, vote)| {
for slot in vote.slots {
slot_to_vote_pubkeys
.entry(slot)
.or_default()
.push(vote_pubkey);
}
});
get_votes_elapsed.stop();
add_votes_elapsed = Measure::start("add_votes");
repair_weight.add_votes(
&blockstore,
slot_to_vote_pubkeys.into_iter(),
root_bank.epoch_stakes_map(),
root_bank.epoch_schedule(),
);
add_votes_elapsed.stop();
lowest_slot_elapsed = Measure::start("lowest_slot_elapsed");
let lowest_slot = blockstore.lowest_slot();
Self::update_lowest_slot(&id, lowest_slot, &cluster_info);
@ -228,40 +268,39 @@ impl RepairService {
&repair_socket,
);*/
generate_repairs_elapsed = Measure::start("generate_repairs_elapsed");
let repairs = Self::generate_repairs(
repair_weight.get_best_weighted_repairs(
blockstore,
root_bank.slot(),
root_bank.epoch_stakes_map(),
root_bank.epoch_schedule(),
MAX_ORPHANS,
MAX_REPAIR_LENGTH,
&duplicate_slot_repair_statuses,
&vote_tracker,
);
generate_repairs_elapsed.stop();
repairs
Some(&mut repair_timing),
)
};
let mut cache = HashMap::new();
let mut send_repairs_elapsed = Measure::start("send_repairs_elapsed");
if let Ok(repairs) = repairs {
let mut cache = HashMap::new();
repairs.into_iter().for_each(|repair_request| {
if let Ok((to, req)) = serve_repair.repair_request(
&cluster_slots,
repair_request,
&mut cache,
&mut repair_stats,
) {
repair_socket.send_to(&req, to).unwrap_or_else(|e| {
info!("{} repair req send_to({}) error {:?}", id, to, e);
0
});
}
});
}
repairs.into_iter().for_each(|repair_request| {
if let Ok((to, req)) = serve_repair.repair_request(
&cluster_slots,
repair_request,
&mut cache,
&mut repair_stats,
) {
repair_socket.send_to(&req, to).unwrap_or_else(|e| {
info!("{} repair req send_to({}) error {:?}", id, to, e);
0
});
}
});
send_repairs_elapsed.stop();
repair_timing.update(
set_root_elapsed.as_us(),
get_votes_elapsed.as_us(),
add_votes_elapsed.as_us(),
lowest_slot_elapsed.as_us(),
update_completed_slots_elapsed.as_us(),
generate_repairs_elapsed.as_us(),
send_repairs_elapsed.as_us(),
);
@ -283,23 +322,31 @@ impl RepairService {
}
datapoint_info!(
"serve_repair-repair-timing",
("set-root-elapsed", repair_timing.set_root_elapsed, i64),
("get-votes-elapsed", repair_timing.get_votes_elapsed, i64),
("add-votes-elapsed", repair_timing.add_votes_elapsed, i64),
(
"lowest_slot_elapsed",
"get-best-orphans-elapsed",
repair_timing.get_best_orphans_elapsed,
i64
),
(
"get-best-shreds-elapsed",
repair_timing.get_best_shreds_elapsed,
i64
),
(
"lowest-slot-elapsed",
repair_timing.lowest_slot_elapsed,
i64
),
(
"update_completed_slots_elapsed",
"update-completed-slots-elapsed",
repair_timing.update_completed_slots_elapsed,
i64
),
(
"generate_repairs_elapsed",
repair_timing.generate_repairs_elapsed,
i64
),
(
"send_repairs_elapsed",
"send-repairs-elapsed",
repair_timing.send_repairs_elapsed,
i64
),
@ -400,31 +447,6 @@ impl RepairService {
}
}
fn generate_repairs(
blockstore: &Blockstore,
root: Slot,
max_repairs: usize,
duplicate_slot_repair_statuses: &HashMap<Slot, DuplicateSlotRepairStatus>,
vote_tracker: &Arc<VoteTracker>,
) -> Result<Vec<RepairType>> {
// Slot height and shred indexes for shreds we want to repair
let mut repairs: Vec<RepairType> = vec![];
Self::generate_repairs_by_level(
blockstore,
&mut repairs,
max_repairs,
root,
duplicate_slot_repair_statuses,
vote_tracker,
);
// Try to resolve orphans in blockstore
let orphans = blockstore.orphans_iterator(root + 1).unwrap();
Self::generate_repairs_for_orphans(orphans, &mut repairs);
Ok(repairs)
}
#[allow(dead_code)]
fn generate_duplicate_repairs_for_slot(
blockstore: &Blockstore,
@ -628,81 +650,6 @@ impl RepairService {
.collect()
}
fn generate_repairs_for_orphans(
orphans: impl Iterator<Item = u64>,
repairs: &mut Vec<RepairType>,
) {
repairs.extend(orphans.take(MAX_ORPHANS).map(RepairType::Orphan));
}
/// Repairs any fork starting at the input slot
fn generate_repairs_by_level(
blockstore: &Blockstore,
repairs: &mut Vec<RepairType>,
max_repairs: usize,
slot: Slot,
duplicate_slot_repair_statuses: &HashMap<Slot, DuplicateSlotRepairStatus>,
vote_tracker: &Arc<VoteTracker>,
) {
let mut seed = [0u8; 32];
thread_rng().fill(&mut seed);
let rng = &mut ChaChaRng::from_seed(seed);
let mut pending_slots = vec![slot];
while repairs.len() < max_repairs && !pending_slots.is_empty() {
pending_slots.retain(|slot| !duplicate_slot_repair_statuses.contains_key(slot));
let mut next_pending_slots = vec![];
let mut level_repairs = HashMap::new();
for slot in &pending_slots {
if let Some(slot_meta) = blockstore.meta(*slot).unwrap() {
let new_repairs = Self::generate_repairs_for_slot(
blockstore,
*slot,
&slot_meta,
std::usize::MAX,
);
if !new_repairs.is_empty() {
level_repairs.insert(*slot, new_repairs);
}
next_pending_slots.extend(slot_meta.next_slots);
}
}
if !level_repairs.is_empty() {
let mut slots_to_repair: Vec<_> = level_repairs.keys().cloned().collect();
let mut weights: Vec<_> = {
let r_vote_tracker = vote_tracker.slot_vote_trackers.read().unwrap();
slots_to_repair
.iter()
.map(|slot| {
if let Some(slot_vote_tracker) = r_vote_tracker.get(slot) {
std::cmp::max(slot_vote_tracker.read().unwrap().total_stake, 1)
} else {
// should it be something else?
1
}
})
.collect()
};
let mut weighted_index = WeightedIndex::new(weights.clone()).unwrap();
while repairs.len() < max_repairs && !level_repairs.is_empty() {
let index = weighted_index.sample(rng);
let slot_repairs = level_repairs.get_mut(&slots_to_repair[index]).unwrap();
repairs.push(slot_repairs.remove(0));
if slot_repairs.is_empty() {
level_repairs.remove(&slots_to_repair[index]);
slots_to_repair.remove(index);
weights.remove(index);
if !weights.is_empty() {
weighted_index = WeightedIndex::new(weights.clone()).unwrap();
}
}
}
}
pending_slots = next_pending_slots;
}
}
fn initialize_lowest_slot(id: Pubkey, blockstore: &Blockstore, cluster_info: &ClusterInfo) {
// Safe to set into gossip because by this time, the leader schedule cache should
// also be updated with the latest root (done in blockstore_processor) and thus
@ -774,6 +721,7 @@ mod test {
use solana_runtime::genesis_utils::{self, GenesisConfigInfo, ValidatorVoteKeypairs};
use solana_sdk::signature::Signer;
use solana_vote_program::vote_transaction;
use std::collections::HashSet;
#[test]
pub fn test_repair_orphan() {
@ -786,11 +734,18 @@ mod test {
let (shreds2, _) = make_slot_entries(5, 2, 1);
shreds.extend(shreds2);
blockstore.insert_shreds(shreds, None, false).unwrap();
let vote_tracker = Arc::new(VoteTracker::default());
let mut repair_weight = RepairWeight::new(0);
assert_eq!(
RepairService::generate_repairs(&blockstore, 0, 2, &HashMap::new(), &vote_tracker)
.unwrap(),
vec![RepairType::HighestShred(0, 0), RepairType::Orphan(2)]
repair_weight.get_best_weighted_repairs(
&blockstore,
&HashMap::new(),
&EpochSchedule::default(),
MAX_ORPHANS,
MAX_REPAIR_LENGTH,
&HashSet::default(),
None
),
vec![RepairType::Orphan(2), RepairType::HighestShred(0, 0)]
);
}
@ -808,12 +763,19 @@ mod test {
// Write this shred to slot 2, should chain to slot 0, which we haven't received
// any shreds for
blockstore.insert_shreds(shreds, None, false).unwrap();
let mut repair_weight = RepairWeight::new(0);
let vote_tracker = Arc::new(VoteTracker::default());
// Check that repair tries to patch the empty slot
assert_eq!(
RepairService::generate_repairs(&blockstore, 0, 2, &HashMap::new(), &vote_tracker)
.unwrap(),
repair_weight.get_best_weighted_repairs(
&blockstore,
&HashMap::new(),
&EpochSchedule::default(),
MAX_ORPHANS,
MAX_REPAIR_LENGTH,
&HashSet::default(),
None
),
vec![RepairType::HighestShred(0, 0)]
);
}
@ -858,83 +820,36 @@ mod test {
})
.collect();
let vote_tracker = Arc::new(VoteTracker::default());
let mut repair_weight = RepairWeight::new(0);
assert_eq!(
RepairService::generate_repairs(
repair_weight.get_best_weighted_repairs(
&blockstore,
0,
std::usize::MAX,
&HashMap::new(),
&vote_tracker
)
.unwrap(),
&EpochSchedule::default(),
MAX_ORPHANS,
MAX_REPAIR_LENGTH,
&HashSet::default(),
None
),
expected
);
assert_eq!(
RepairService::generate_repairs(
repair_weight.get_best_weighted_repairs(
&blockstore,
0,
expected.len() - 2,
&HashMap::new(),
&vote_tracker,
)
.unwrap()[..],
&EpochSchedule::default(),
MAX_ORPHANS,
expected.len() - 2,
&HashSet::default(),
None
)[..],
expected[0..expected.len() - 2]
);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_repairs_distributed_across_slots() {
solana_logger::setup();
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let num_entries_per_slot = 100;
// Create some shreds
for i in 1..10 {
let (shreds, _) = make_slot_entries(i, 0, num_entries_per_slot as u64);
// Only insert the first shred
blockstore
.insert_shreds(shreds[..1].to_vec(), None, false)
.unwrap();
}
let vote_tracker = Arc::new(VoteTracker::default());
let repairs = RepairService::generate_repairs(
&blockstore,
0,
num_entries_per_slot,
&HashMap::new(),
&vote_tracker,
)
.unwrap();
let mut repairs_slots = HashMap::new();
for repair in repairs {
match repair {
RepairType::Shred(slot, _shred_index) => {
*repairs_slots.entry(slot).or_insert(0) += 1;
}
RepairType::HighestShred(slot, _shred_index) => {
*repairs_slots.entry(slot).or_insert(0) += 1;
}
RepairType::Orphan(slot) => {
*repairs_slots.entry(slot).or_insert(0) += 1;
}
}
}
for i in 1..10 {
assert!(repairs_slots.contains_key(&i));
}
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_generate_highest_repair() {
let blockstore_path = get_tmp_ledger_path!();
@ -956,16 +871,17 @@ mod test {
let expected: Vec<RepairType> =
vec![RepairType::HighestShred(0, num_shreds_per_slot - 1)];
let vote_tracker = Arc::new(VoteTracker::default());
let mut repair_weight = RepairWeight::new(0);
assert_eq!(
RepairService::generate_repairs(
repair_weight.get_best_weighted_repairs(
&blockstore,
0,
std::usize::MAX,
&HashMap::new(),
&vote_tracker
)
.unwrap(),
&EpochSchedule::default(),
MAX_ORPHANS,
MAX_REPAIR_LENGTH,
&HashSet::default(),
None
),
expected
);
}