Add weighted traversal (#10877)

Co-authored-by: Carl <carl@solana.com>
This commit is contained in:
carllin
2020-07-02 14:33:04 -07:00
committed by GitHub
parent bcf36cbf18
commit f17ac70bb2
4 changed files with 495 additions and 28 deletions

View File

@ -4,6 +4,7 @@ use crate::{
cluster_info::ClusterInfo,
cluster_info_vote_listener::VoteTracker,
cluster_slots::ClusterSlots,
repair_weighted_traversal::Contains,
result::Result,
serve_repair::{RepairType, ServeRepair, DEFAULT_NONCE},
};
@ -273,6 +274,61 @@ impl RepairService {
Ok(repairs)
}
pub fn generate_repairs_for_slot(
blockstore: &Blockstore,
slot: Slot,
slot_meta: &SlotMeta,
max_repairs: usize,
) -> Vec<RepairType> {
if max_repairs == 0 || slot_meta.is_full() {
vec![]
} else if slot_meta.consumed == slot_meta.received {
vec![RepairType::HighestShred(slot, slot_meta.received)]
} else {
let reqs = blockstore.find_missing_data_indexes(
slot,
slot_meta.first_shred_timestamp,
slot_meta.consumed,
slot_meta.received,
max_repairs,
);
reqs.into_iter()
.map(|i| RepairType::Shred(slot, i))
.collect()
}
}
/// Repairs any fork starting at the input slot
pub fn generate_repairs_for_fork(
blockstore: &Blockstore,
repairs: &mut Vec<RepairType>,
max_repairs: usize,
slot: Slot,
duplicate_slot_repair_statuses: &dyn Contains<Slot>,
) {
let mut pending_slots = vec![slot];
while repairs.len() < max_repairs && !pending_slots.is_empty() {
let slot = pending_slots.pop().unwrap();
if duplicate_slot_repair_statuses.contains(&slot) {
// These are repaired through a different path
continue;
}
if let Some(slot_meta) = blockstore.meta(slot).unwrap() {
let new_repairs = Self::generate_repairs_for_slot(
blockstore,
slot,
&slot_meta,
max_repairs - repairs.len(),
);
repairs.extend(new_repairs);
let next_slots = slot_meta.next_slots;
pending_slots.extend(next_slots);
} else {
break;
}
}
}
fn generate_repairs(
blockstore: &Blockstore,
root: Slot,
@ -282,7 +338,7 @@ impl RepairService {
) -> Result<Vec<RepairType>> {
// Slot height and shred indexes for shreds we want to repair
let mut repairs: Vec<RepairType> = vec![];
Self::generate_repairs_for_fork(
Self::generate_repairs_by_level(
blockstore,
&mut repairs,
max_repairs,
@ -501,30 +557,6 @@ impl RepairService {
.collect()
}
fn generate_repairs_for_slot(
blockstore: &Blockstore,
slot: Slot,
slot_meta: &SlotMeta,
max_repairs: usize,
) -> Vec<RepairType> {
if slot_meta.is_full() {
vec![]
} else if slot_meta.consumed == slot_meta.received {
vec![RepairType::HighestShred(slot, slot_meta.received)]
} else {
let reqs = blockstore.find_missing_data_indexes(
slot,
slot_meta.first_shred_timestamp,
slot_meta.consumed,
slot_meta.received,
max_repairs,
);
reqs.into_iter()
.map(|i| RepairType::Shred(slot, i))
.collect()
}
}
fn generate_repairs_for_orphans(
orphans: impl Iterator<Item = u64>,
repairs: &mut Vec<RepairType>,
@ -533,7 +565,7 @@ impl RepairService {
}
/// Repairs any fork starting at the input slot
fn generate_repairs_for_fork(
fn generate_repairs_by_level(
blockstore: &Blockstore,
repairs: &mut Vec<RepairType>,
max_repairs: usize,