@ -325,9 +325,14 @@ impl ClusterInfoRepairListener {
|
||||
if slot > my_root
|
||||
|| num_slots_repaired >= num_slots_to_repair
|
||||
|| slot > max_confirmed_repairee_slot
|
||||
// Don't repair if the next rooted slot jumps, because that means
|
||||
// we started from a snapshot and don't have the immediate next
|
||||
// slot that the repairee needs
|
||||
|| slot_meta.is_none()
|
||||
{
|
||||
break;
|
||||
}
|
||||
let slot_meta = slot_meta.unwrap();
|
||||
if !repairee_epoch_slots.slots.contains(&slot) {
|
||||
// Calculate the shred indexes this node is responsible for repairing. Note that
|
||||
// because we are only repairing slots that are before our root, the slot.received
|
||||
@ -338,7 +343,7 @@ impl ClusterInfoRepairListener {
|
||||
// the cluster
|
||||
let num_shreds_in_slot = slot_meta.received as usize;
|
||||
|
||||
// Check if I'm responsible for repairing this slots
|
||||
// Check if I'm responsible for repairing this slot
|
||||
if let Some(my_repair_indexes) = Self::calculate_my_repairman_index_for_slot(
|
||||
my_pubkey,
|
||||
&eligible_repairmen,
|
||||
|
@ -424,6 +424,16 @@ impl Blocktree {
|
||||
Ok(slot_iterator.take_while(move |((shred_slot, _), _)| *shred_slot == slot))
|
||||
}
|
||||
|
||||
pub fn rooted_slot_iterator<'a>(
|
||||
&'a self,
|
||||
slot: Slot,
|
||||
) -> Result<impl Iterator<Item = u64> + 'a> {
|
||||
let slot_iterator = self
|
||||
.db
|
||||
.iter::<cf::Root>(IteratorMode::From(slot, IteratorDirection::Forward))?;
|
||||
Ok(slot_iterator.map(move |(rooted_slot, _)| rooted_slot))
|
||||
}
|
||||
|
||||
fn try_shred_recovery(
|
||||
db: &Database,
|
||||
erasure_metas: &HashMap<(u64, u64), ErasureMeta>,
|
||||
|
@ -5,6 +5,7 @@ use solana_sdk::clock::Slot;
|
||||
|
||||
pub struct RootedSlotIterator<'a> {
|
||||
next_slots: Vec<Slot>,
|
||||
prev_root: Slot,
|
||||
blocktree: &'a Blocktree,
|
||||
}
|
||||
|
||||
@ -13,6 +14,7 @@ impl<'a> RootedSlotIterator<'a> {
|
||||
if blocktree.is_root(start_slot) {
|
||||
Ok(Self {
|
||||
next_slots: vec![start_slot],
|
||||
prev_root: start_slot,
|
||||
blocktree,
|
||||
})
|
||||
} else {
|
||||
@ -21,34 +23,54 @@ impl<'a> RootedSlotIterator<'a> {
|
||||
}
|
||||
}
|
||||
impl<'a> Iterator for RootedSlotIterator<'a> {
|
||||
type Item = (Slot, SlotMeta);
|
||||
type Item = (Slot, Option<SlotMeta>);
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
// Clone b/c passing the closure to the map below requires exclusive access to
|
||||
// `self`, which is borrowed here if we don't clone.
|
||||
let rooted_slot = self
|
||||
let (rooted_slot, slot_skipped) = self
|
||||
.next_slots
|
||||
.iter()
|
||||
.find(|x| self.blocktree.is_root(**x))
|
||||
.cloned();
|
||||
|
||||
rooted_slot
|
||||
.map(|rooted_slot| {
|
||||
let slot_meta = self
|
||||
.map(|x| (Some(*x), false))
|
||||
.unwrap_or_else(|| {
|
||||
let mut iter = self
|
||||
.blocktree
|
||||
.meta(rooted_slot)
|
||||
.expect("Database failure, couldnt fetch SlotMeta");
|
||||
.rooted_slot_iterator(
|
||||
// First iteration the root always exists as guaranteed by the constructor,
|
||||
// so this unwrap_or_else cases won't be hit. Every subsequent iteration
|
||||
// of this iterator must thereafter have a valid `prev_root`
|
||||
self.prev_root,
|
||||
)
|
||||
.expect("Database failure, couldn't fetch rooted slots iterator");
|
||||
iter.next();
|
||||
(iter.next(), true)
|
||||
});
|
||||
|
||||
if slot_meta.is_none() {
|
||||
warn!("Rooted SlotMeta was deleted in between checking is_root and fetch");
|
||||
}
|
||||
|
||||
slot_meta.map(|slot_meta| {
|
||||
self.next_slots = slot_meta.next_slots.clone();
|
||||
(rooted_slot, slot_meta)
|
||||
})
|
||||
let slot_meta = rooted_slot
|
||||
.map(|r| {
|
||||
self.blocktree
|
||||
.meta(r)
|
||||
.expect("Database failure, couldnt fetch SlotMeta")
|
||||
})
|
||||
.unwrap_or(None)
|
||||
.unwrap_or(None);
|
||||
|
||||
if let Some(ref slot_meta) = slot_meta {
|
||||
self.next_slots = slot_meta.next_slots.clone();
|
||||
}
|
||||
|
||||
if slot_meta.is_none() && slot_skipped {
|
||||
warn!("Rooted SlotMeta was deleted in between checking is_root and fetch");
|
||||
}
|
||||
|
||||
rooted_slot.map(|r| {
|
||||
self.prev_root = r;
|
||||
if slot_skipped {
|
||||
(r, None)
|
||||
} else {
|
||||
(r, slot_meta)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -69,7 +91,7 @@ mod tests {
|
||||
|
||||
slot 0
|
||||
|
|
||||
slot 1 <-- set_root(true)
|
||||
slot 1 <-- set_root
|
||||
/ \
|
||||
slot 2 |
|
||||
/ |
|
||||
@ -134,4 +156,85 @@ mod tests {
|
||||
drop(blocktree);
|
||||
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_skipping_rooted_slot_iterator() {
|
||||
let blocktree_path = get_tmp_ledger_path!();
|
||||
let blocktree = Blocktree::open(&blocktree_path).unwrap();
|
||||
let ticks_per_slot = 5;
|
||||
/*
|
||||
Build a blocktree in the ledger with the following fork structure:
|
||||
slot 0
|
||||
|
|
||||
slot 1
|
||||
|
|
||||
slot 2
|
||||
|
|
||||
slot 3 <-- set_root
|
||||
|
|
||||
SKIP (caused by a snapshot)
|
||||
|
|
||||
slot 10 <-- set_root
|
||||
|
|
||||
slot 11 <-- set_root
|
||||
*/
|
||||
|
||||
// Create pre-skip slots
|
||||
for slot in 0..=3 {
|
||||
let parent = {
|
||||
if slot == 0 {
|
||||
0
|
||||
} else {
|
||||
slot - 1
|
||||
}
|
||||
};
|
||||
fill_blocktree_slot_with_ticks(
|
||||
&blocktree,
|
||||
ticks_per_slot,
|
||||
slot,
|
||||
parent,
|
||||
Hash::default(),
|
||||
);
|
||||
}
|
||||
|
||||
// Set roots
|
||||
blocktree.set_roots(&[0, 1, 2, 3]).unwrap();
|
||||
|
||||
// Create one post-skip slot at 10, simulating starting from a snapshot
|
||||
// at 10
|
||||
blocktree.set_roots(&[10]).unwrap();
|
||||
// Try to get an iterator from before the skip. The post-skip slot
|
||||
// should not return a SlotMeta
|
||||
let result: Vec<_> = RootedSlotIterator::new(3, &blocktree)
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.map(|(slot, meta)| (slot, meta.is_some()))
|
||||
.collect();
|
||||
let expected = vec![(3, true), (10, false)];
|
||||
assert_eq!(result, expected);
|
||||
|
||||
// Create one more post-skip slot at 11 with parent equal to 10
|
||||
fill_blocktree_slot_with_ticks(&blocktree, ticks_per_slot, 11, 10, Hash::default());
|
||||
|
||||
// Set roots
|
||||
blocktree.set_roots(&[11]).unwrap();
|
||||
|
||||
let result: Vec<_> = RootedSlotIterator::new(0, &blocktree)
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.map(|(slot, meta)| (slot, meta.is_some()))
|
||||
.collect();
|
||||
let expected = vec![
|
||||
(0, true),
|
||||
(1, true),
|
||||
(2, true),
|
||||
(3, true),
|
||||
(10, false),
|
||||
(11, true),
|
||||
];
|
||||
assert_eq!(result, expected);
|
||||
|
||||
drop(blocktree);
|
||||
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user