optimizes and simplifies SlotMeta::completed_data_indexes ops (#21059)
SlotMeta::completed_data_indexes is defined as a Vec<u32>: https://github.com/solana-labs/solana/blob/a8d78e89d/ledger/src/blockstore_meta.rs#L31-L32 which results in inefficient updates: https://github.com/solana-labs/solana/blob/a8d78e89d/ledger/src/blockstore.rs#L3245-L3326 This commit changes the type to BTreeSet<u32> for efficient and simpler updates and lookups. The change should be backward compatible because Vec<T> and BTreeSet<T> are both serialized as seq: https://github.com/serde-rs/serde/blob/ce0844b9e/serde/src/ser/impls.rs#L207-L208 https://github.com/serde-rs/serde/blob/ce0844b9e/serde/src/ser/impls.rs#L216-L217
This commit is contained in:
		@@ -46,7 +46,7 @@ use {
 | 
			
		||||
        borrow::Cow,
 | 
			
		||||
        cell::RefCell,
 | 
			
		||||
        cmp,
 | 
			
		||||
        collections::{hash_map::Entry as HashMapEntry, BTreeMap, HashMap, HashSet},
 | 
			
		||||
        collections::{hash_map::Entry as HashMapEntry, BTreeMap, BTreeSet, HashMap, HashSet},
 | 
			
		||||
        convert::TryInto,
 | 
			
		||||
        fs,
 | 
			
		||||
        io::{Error as IoError, ErrorKind},
 | 
			
		||||
@@ -2826,7 +2826,7 @@ impl Blockstore {
 | 
			
		||||
        // Find all the ranges for the completed data blocks
 | 
			
		||||
        let completed_ranges = Self::get_completed_data_ranges(
 | 
			
		||||
            start_index as u32,
 | 
			
		||||
            &slot_meta.completed_data_indexes[..],
 | 
			
		||||
            &slot_meta.completed_data_indexes,
 | 
			
		||||
            slot_meta.consumed as u32,
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
@@ -2835,28 +2835,21 @@ impl Blockstore {
 | 
			
		||||
 | 
			
		||||
    // Get the range of indexes [start_index, end_index] of every completed data block
 | 
			
		||||
    fn get_completed_data_ranges(
 | 
			
		||||
        mut start_index: u32,
 | 
			
		||||
        completed_data_end_indexes: &[u32],
 | 
			
		||||
        start_index: u32,
 | 
			
		||||
        completed_data_indexes: &BTreeSet<u32>,
 | 
			
		||||
        consumed: u32,
 | 
			
		||||
    ) -> CompletedRanges {
 | 
			
		||||
        let mut completed_data_ranges = vec![];
 | 
			
		||||
        let floor = completed_data_end_indexes
 | 
			
		||||
            .iter()
 | 
			
		||||
            .position(|i| *i >= start_index)
 | 
			
		||||
            .unwrap_or_else(|| completed_data_end_indexes.len());
 | 
			
		||||
 | 
			
		||||
        for i in &completed_data_end_indexes[floor as usize..] {
 | 
			
		||||
        // `consumed` is the next missing shred index, but shred `i` existing in
 | 
			
		||||
        // completed_data_end_indexes implies it's not missing
 | 
			
		||||
            assert!(*i != consumed);
 | 
			
		||||
 | 
			
		||||
            if *i < consumed {
 | 
			
		||||
                completed_data_ranges.push((start_index, *i));
 | 
			
		||||
                start_index = *i + 1;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        completed_data_ranges
 | 
			
		||||
        assert!(!completed_data_indexes.contains(&consumed));
 | 
			
		||||
        completed_data_indexes
 | 
			
		||||
            .range(start_index..consumed)
 | 
			
		||||
            .scan(start_index, |begin, index| {
 | 
			
		||||
                let out = (*begin, *index);
 | 
			
		||||
                *begin = index + 1;
 | 
			
		||||
                Some(out)
 | 
			
		||||
            })
 | 
			
		||||
            .collect()
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn get_entries_in_data_block(
 | 
			
		||||
@@ -3249,80 +3242,35 @@ fn update_completed_data_indexes(
 | 
			
		||||
    is_last_in_data: bool,
 | 
			
		||||
    new_shred_index: u32,
 | 
			
		||||
    received_data_shreds: &ShredIndex,
 | 
			
		||||
    // Sorted array of shred indexes marked data complete
 | 
			
		||||
    completed_data_indexes: &mut Vec<u32>,
 | 
			
		||||
    // Shreds indices which are marked data complete.
 | 
			
		||||
    completed_data_indexes: &mut BTreeSet<u32>,
 | 
			
		||||
) -> Vec<(u32, u32)> {
 | 
			
		||||
    let mut first_greater_pos = None;
 | 
			
		||||
    let mut prev_completed_shred_index = None;
 | 
			
		||||
    // Find the first item in `completed_data_indexes > new_shred_index`
 | 
			
		||||
    for (i, completed_data_index) in completed_data_indexes.iter().enumerate() {
 | 
			
		||||
        // `completed_data_indexes` should be sorted from smallest to largest
 | 
			
		||||
        assert!(
 | 
			
		||||
            prev_completed_shred_index.is_none()
 | 
			
		||||
                || *completed_data_index > prev_completed_shred_index.unwrap()
 | 
			
		||||
        );
 | 
			
		||||
        if *completed_data_index > new_shred_index {
 | 
			
		||||
            first_greater_pos = Some(i);
 | 
			
		||||
            break;
 | 
			
		||||
        }
 | 
			
		||||
        prev_completed_shred_index = Some(*completed_data_index);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    let start_shred_index = completed_data_indexes
 | 
			
		||||
        .range(..new_shred_index)
 | 
			
		||||
        .next_back()
 | 
			
		||||
        .map(|index| index + 1)
 | 
			
		||||
        .unwrap_or_default();
 | 
			
		||||
    // Consecutive entries i, k, j in this vector represent potential ranges [i, k),
 | 
			
		||||
    // [k, j) that could be completed data ranges
 | 
			
		||||
    let mut check_ranges: Vec<u32> = vec![prev_completed_shred_index
 | 
			
		||||
        .map(|completed_data_shred_index| completed_data_shred_index + 1)
 | 
			
		||||
        .unwrap_or(0)];
 | 
			
		||||
    let mut first_greater_data_complete_index =
 | 
			
		||||
        first_greater_pos.map(|i| completed_data_indexes[i]);
 | 
			
		||||
 | 
			
		||||
    let mut shred_indices = vec![start_shred_index];
 | 
			
		||||
    // `new_shred_index` is data complete, so need to insert here into the
 | 
			
		||||
    // `completed_data_indexes`
 | 
			
		||||
    if is_last_in_data {
 | 
			
		||||
        if first_greater_pos.is_some() {
 | 
			
		||||
            // If there exists a data complete shred greater than `new_shred_index`,
 | 
			
		||||
            // and the new shred is marked data complete, then the range
 | 
			
		||||
            // [new_shred_index + 1, completed_data_indexes[pos]] may be complete,
 | 
			
		||||
            // so add that range to check
 | 
			
		||||
            check_ranges.push(new_shred_index + 1);
 | 
			
		||||
        completed_data_indexes.insert(new_shred_index);
 | 
			
		||||
        shred_indices.push(new_shred_index + 1);
 | 
			
		||||
    }
 | 
			
		||||
        completed_data_indexes.insert(
 | 
			
		||||
            first_greater_pos.unwrap_or_else(|| {
 | 
			
		||||
                // If `first_greater_pos` is none, then there was no greater
 | 
			
		||||
                // data complete index so mark this new shred's index as the latest data
 | 
			
		||||
                // complete index
 | 
			
		||||
                first_greater_data_complete_index = Some(new_shred_index);
 | 
			
		||||
                completed_data_indexes.len()
 | 
			
		||||
            }),
 | 
			
		||||
            new_shred_index,
 | 
			
		||||
        );
 | 
			
		||||
    if let Some(index) = completed_data_indexes.range(new_shred_index + 1..).next() {
 | 
			
		||||
        shred_indices.push(index + 1);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if first_greater_data_complete_index.is_none() {
 | 
			
		||||
        // That means new_shred_index > all known completed data indexes and
 | 
			
		||||
        // new shred not data complete, which means the data set of that new
 | 
			
		||||
        // shred is not data complete
 | 
			
		||||
        return vec![];
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    check_ranges.push(first_greater_data_complete_index.unwrap() + 1);
 | 
			
		||||
    let mut completed_data_ranges = vec![];
 | 
			
		||||
    for range in check_ranges.windows(2) {
 | 
			
		||||
        let mut is_complete = true;
 | 
			
		||||
        for shred_index in range[0]..range[1] {
 | 
			
		||||
            // If we're missing any shreds, the data set cannot be confirmed
 | 
			
		||||
            // to be completed, so check the next range
 | 
			
		||||
            if !received_data_shreds.is_present(shred_index as u64) {
 | 
			
		||||
                is_complete = false;
 | 
			
		||||
                break;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        if is_complete {
 | 
			
		||||
            completed_data_ranges.push((range[0], range[1] - 1));
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    completed_data_ranges
 | 
			
		||||
    shred_indices
 | 
			
		||||
        .windows(2)
 | 
			
		||||
        .filter(|ix| {
 | 
			
		||||
            let (begin, end) = (ix[0] as u64, ix[1] as u64);
 | 
			
		||||
            let num_shreds = (end - begin) as usize;
 | 
			
		||||
            received_data_shreds.present_in_bounds(begin..end) == num_shreds
 | 
			
		||||
        })
 | 
			
		||||
        .map(|ix| (ix[0], ix[1] - 1))
 | 
			
		||||
        .collect()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn update_slot_meta(
 | 
			
		||||
@@ -5990,7 +5938,7 @@ pub mod tests {
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn test_get_completed_data_ranges() {
 | 
			
		||||
        let completed_data_end_indexes = vec![2, 4, 9, 11];
 | 
			
		||||
        let completed_data_end_indexes = [2, 4, 9, 11].iter().copied().collect();
 | 
			
		||||
 | 
			
		||||
        // Consumed is 1, which means we're missing shred with index 1, should return empty
 | 
			
		||||
        let start_index = 0;
 | 
			
		||||
@@ -5998,7 +5946,7 @@ pub mod tests {
 | 
			
		||||
        assert_eq!(
 | 
			
		||||
            Blockstore::get_completed_data_ranges(
 | 
			
		||||
                start_index,
 | 
			
		||||
                &completed_data_end_indexes[..],
 | 
			
		||||
                &completed_data_end_indexes,
 | 
			
		||||
                consumed
 | 
			
		||||
            ),
 | 
			
		||||
            vec![]
 | 
			
		||||
@@ -6009,7 +5957,7 @@ pub mod tests {
 | 
			
		||||
        assert_eq!(
 | 
			
		||||
            Blockstore::get_completed_data_ranges(
 | 
			
		||||
                start_index,
 | 
			
		||||
                &completed_data_end_indexes[..],
 | 
			
		||||
                &completed_data_end_indexes,
 | 
			
		||||
                consumed
 | 
			
		||||
            ),
 | 
			
		||||
            vec![(0, 2)]
 | 
			
		||||
@@ -6022,6 +5970,7 @@ pub mod tests {
 | 
			
		||||
        // range:
 | 
			
		||||
        // [start_index, completed_data_end_indexes[j]] ==
 | 
			
		||||
        // [completed_data_end_indexes[i], completed_data_end_indexes[j]],
 | 
			
		||||
        let completed_data_end_indexes: Vec<_> = completed_data_end_indexes.into_iter().collect();
 | 
			
		||||
        for i in 0..completed_data_end_indexes.len() {
 | 
			
		||||
            for j in i..completed_data_end_indexes.len() {
 | 
			
		||||
                let start_index = completed_data_end_indexes[i];
 | 
			
		||||
@@ -6036,10 +5985,12 @@ pub mod tests {
 | 
			
		||||
                        .map(|end_indexes| (end_indexes[0] + 1, end_indexes[1])),
 | 
			
		||||
                );
 | 
			
		||||
 | 
			
		||||
                let completed_data_end_indexes =
 | 
			
		||||
                    completed_data_end_indexes.iter().copied().collect();
 | 
			
		||||
                assert_eq!(
 | 
			
		||||
                    Blockstore::get_completed_data_ranges(
 | 
			
		||||
                        start_index,
 | 
			
		||||
                        &completed_data_end_indexes[..],
 | 
			
		||||
                        &completed_data_end_indexes,
 | 
			
		||||
                        consumed
 | 
			
		||||
                    ),
 | 
			
		||||
                    expected
 | 
			
		||||
@@ -8308,7 +8259,7 @@ pub mod tests {
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn test_update_completed_data_indexes() {
 | 
			
		||||
        let mut completed_data_indexes: Vec<u32> = vec![];
 | 
			
		||||
        let mut completed_data_indexes = BTreeSet::default();
 | 
			
		||||
        let mut shred_index = ShredIndex::default();
 | 
			
		||||
 | 
			
		||||
        for i in 0..10 {
 | 
			
		||||
@@ -8317,13 +8268,13 @@ pub mod tests {
 | 
			
		||||
                update_completed_data_indexes(true, i, &shred_index, &mut completed_data_indexes),
 | 
			
		||||
                vec![(i, i)]
 | 
			
		||||
            );
 | 
			
		||||
            assert_eq!(completed_data_indexes, (0..=i).collect::<Vec<u32>>());
 | 
			
		||||
            assert!(completed_data_indexes.iter().copied().eq(0..=i));
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn test_update_completed_data_indexes_out_of_order() {
 | 
			
		||||
        let mut completed_data_indexes = vec![];
 | 
			
		||||
        let mut completed_data_indexes = BTreeSet::default();
 | 
			
		||||
        let mut shred_index = ShredIndex::default();
 | 
			
		||||
 | 
			
		||||
        shred_index.set_present(4, true);
 | 
			
		||||
@@ -8345,7 +8296,7 @@ pub mod tests {
 | 
			
		||||
            update_completed_data_indexes(true, 3, &shred_index, &mut completed_data_indexes)
 | 
			
		||||
                .is_empty()
 | 
			
		||||
        );
 | 
			
		||||
        assert_eq!(completed_data_indexes, vec![3]);
 | 
			
		||||
        assert!(completed_data_indexes.iter().eq([3].iter()));
 | 
			
		||||
 | 
			
		||||
        // Inserting data complete shred 1 now confirms the range of shreds [2, 3]
 | 
			
		||||
        // is part of the same data set
 | 
			
		||||
@@ -8354,7 +8305,7 @@ pub mod tests {
 | 
			
		||||
            update_completed_data_indexes(true, 1, &shred_index, &mut completed_data_indexes),
 | 
			
		||||
            vec![(2, 3)]
 | 
			
		||||
        );
 | 
			
		||||
        assert_eq!(completed_data_indexes, vec![1, 3]);
 | 
			
		||||
        assert!(completed_data_indexes.iter().eq([1, 3].iter()));
 | 
			
		||||
 | 
			
		||||
        // Inserting data complete shred 0 now confirms the range of shreds [0]
 | 
			
		||||
        // is part of the same data set
 | 
			
		||||
@@ -8363,7 +8314,7 @@ pub mod tests {
 | 
			
		||||
            update_completed_data_indexes(true, 0, &shred_index, &mut completed_data_indexes),
 | 
			
		||||
            vec![(0, 0), (1, 1)]
 | 
			
		||||
        );
 | 
			
		||||
        assert_eq!(completed_data_indexes, vec![0, 1, 3]);
 | 
			
		||||
        assert!(completed_data_indexes.iter().eq([0, 1, 3].iter()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
 
 | 
			
		||||
@@ -28,8 +28,8 @@ pub struct SlotMeta {
 | 
			
		||||
    // True if this slot is full (consumed == last_index + 1) and if every
 | 
			
		||||
    // slot that is a parent of this slot is also connected.
 | 
			
		||||
    pub is_connected: bool,
 | 
			
		||||
    // List of start indexes for completed data slots
 | 
			
		||||
    pub completed_data_indexes: Vec<u32>,
 | 
			
		||||
    // Shreds indices which are marked data complete.
 | 
			
		||||
    pub completed_data_indexes: BTreeSet<u32>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
 | 
			
		||||
@@ -199,14 +199,10 @@ impl SlotMeta {
 | 
			
		||||
    pub(crate) fn new(slot: Slot, parent_slot: Slot) -> Self {
 | 
			
		||||
        SlotMeta {
 | 
			
		||||
            slot,
 | 
			
		||||
            consumed: 0,
 | 
			
		||||
            received: 0,
 | 
			
		||||
            first_shred_timestamp: 0,
 | 
			
		||||
            parent_slot,
 | 
			
		||||
            next_slots: vec![],
 | 
			
		||||
            is_connected: slot == 0,
 | 
			
		||||
            last_index: std::u64::MAX,
 | 
			
		||||
            completed_data_indexes: vec![],
 | 
			
		||||
            ..SlotMeta::default()
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user