@@ -61,6 +61,11 @@ pub struct Blocktree {
 | 
				
			|||||||
    pub completed_slots_senders: Vec<SyncSender<Vec<u64>>>,
 | 
					    pub completed_slots_senders: Vec<SyncSender<Vec<u64>>>,
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					pub struct IndexMetaWorkingSetEntry {
 | 
				
			||||||
 | 
					    index: Index,
 | 
				
			||||||
 | 
					    did_insert_occur: bool,
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
pub struct BlocktreeInsertionMetrics {
 | 
					pub struct BlocktreeInsertionMetrics {
 | 
				
			||||||
    pub num_shreds: usize,
 | 
					    pub num_shreds: usize,
 | 
				
			||||||
    pub insert_lock_elapsed: u64,
 | 
					    pub insert_lock_elapsed: u64,
 | 
				
			||||||
@@ -72,6 +77,7 @@ pub struct BlocktreeInsertionMetrics {
 | 
				
			|||||||
    pub total_elapsed: u64,
 | 
					    pub total_elapsed: u64,
 | 
				
			||||||
    pub num_inserted: u64,
 | 
					    pub num_inserted: u64,
 | 
				
			||||||
    pub num_recovered: usize,
 | 
					    pub num_recovered: usize,
 | 
				
			||||||
 | 
					    pub index_meta_time: u64,
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
impl BlocktreeInsertionMetrics {
 | 
					impl BlocktreeInsertionMetrics {
 | 
				
			||||||
@@ -305,7 +311,7 @@ impl Blocktree {
 | 
				
			|||||||
    fn try_shred_recovery(
 | 
					    fn try_shred_recovery(
 | 
				
			||||||
        db: &Database,
 | 
					        db: &Database,
 | 
				
			||||||
        erasure_metas: &HashMap<(u64, u64), ErasureMeta>,
 | 
					        erasure_metas: &HashMap<(u64, u64), ErasureMeta>,
 | 
				
			||||||
        index_working_set: &HashMap<u64, Index>,
 | 
					        index_working_set: &HashMap<u64, IndexMetaWorkingSetEntry>,
 | 
				
			||||||
        prev_inserted_datas: &mut HashMap<(u64, u64), Shred>,
 | 
					        prev_inserted_datas: &mut HashMap<(u64, u64), Shred>,
 | 
				
			||||||
        prev_inserted_codes: &mut HashMap<(u64, u64), Shred>,
 | 
					        prev_inserted_codes: &mut HashMap<(u64, u64), Shred>,
 | 
				
			||||||
    ) -> Vec<Shred> {
 | 
					    ) -> Vec<Shred> {
 | 
				
			||||||
@@ -330,7 +336,8 @@ impl Blocktree {
 | 
				
			|||||||
                );
 | 
					                );
 | 
				
			||||||
            };
 | 
					            };
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            let index = index_working_set.get(&slot).expect("Index");
 | 
					            let index_meta_entry = index_working_set.get(&slot).expect("Index");
 | 
				
			||||||
 | 
					            let index = &index_meta_entry.index;
 | 
				
			||||||
            match erasure_meta.status(&index) {
 | 
					            match erasure_meta.status(&index) {
 | 
				
			||||||
                ErasureMetaStatus::CanRecover => {
 | 
					                ErasureMetaStatus::CanRecover => {
 | 
				
			||||||
                    // Find shreds for this erasure set and try recovery
 | 
					                    // Find shreds for this erasure set and try recovery
 | 
				
			||||||
@@ -423,6 +430,7 @@ impl Blocktree {
 | 
				
			|||||||
        let num_shreds = shreds.len();
 | 
					        let num_shreds = shreds.len();
 | 
				
			||||||
        let mut start = Measure::start("Shred insertion");
 | 
					        let mut start = Measure::start("Shred insertion");
 | 
				
			||||||
        let mut num_inserted = 0;
 | 
					        let mut num_inserted = 0;
 | 
				
			||||||
 | 
					        let mut index_meta_time = 0;
 | 
				
			||||||
        shreds.into_iter().for_each(|shred| {
 | 
					        shreds.into_iter().for_each(|shred| {
 | 
				
			||||||
            let insert_success = {
 | 
					            let insert_success = {
 | 
				
			||||||
                if shred.is_data() {
 | 
					                if shred.is_data() {
 | 
				
			||||||
@@ -432,6 +440,7 @@ impl Blocktree {
 | 
				
			|||||||
                        &mut slot_meta_working_set,
 | 
					                        &mut slot_meta_working_set,
 | 
				
			||||||
                        &mut write_batch,
 | 
					                        &mut write_batch,
 | 
				
			||||||
                        &mut just_inserted_data_shreds,
 | 
					                        &mut just_inserted_data_shreds,
 | 
				
			||||||
 | 
					                        &mut index_meta_time,
 | 
				
			||||||
                    )
 | 
					                    )
 | 
				
			||||||
                } else if shred.is_code() {
 | 
					                } else if shred.is_code() {
 | 
				
			||||||
                    self.check_insert_coding_shred(
 | 
					                    self.check_insert_coding_shred(
 | 
				
			||||||
@@ -440,6 +449,7 @@ impl Blocktree {
 | 
				
			|||||||
                        &mut index_working_set,
 | 
					                        &mut index_working_set,
 | 
				
			||||||
                        &mut write_batch,
 | 
					                        &mut write_batch,
 | 
				
			||||||
                        &mut just_inserted_coding_shreds,
 | 
					                        &mut just_inserted_coding_shreds,
 | 
				
			||||||
 | 
					                        &mut index_meta_time,
 | 
				
			||||||
                    )
 | 
					                    )
 | 
				
			||||||
                } else {
 | 
					                } else {
 | 
				
			||||||
                    panic!("There should be no other case");
 | 
					                    panic!("There should be no other case");
 | 
				
			||||||
@@ -473,6 +483,7 @@ impl Blocktree {
 | 
				
			|||||||
                            &mut slot_meta_working_set,
 | 
					                            &mut slot_meta_working_set,
 | 
				
			||||||
                            &mut write_batch,
 | 
					                            &mut write_batch,
 | 
				
			||||||
                            &mut just_inserted_coding_shreds,
 | 
					                            &mut just_inserted_coding_shreds,
 | 
				
			||||||
 | 
					                            &mut index_meta_time,
 | 
				
			||||||
                        );
 | 
					                        );
 | 
				
			||||||
                    }
 | 
					                    }
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
@@ -498,8 +509,10 @@ impl Blocktree {
 | 
				
			|||||||
            write_batch.put::<cf::ErasureMeta>((slot, set_index), &erasure_meta)?;
 | 
					            write_batch.put::<cf::ErasureMeta>((slot, set_index), &erasure_meta)?;
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        for (&slot, index) in index_working_set.iter() {
 | 
					        for (&slot, index_working_set_entry) in index_working_set.iter() {
 | 
				
			||||||
            write_batch.put::<cf::Index>(slot, index)?;
 | 
					            if index_working_set_entry.did_insert_occur {
 | 
				
			||||||
 | 
					                write_batch.put::<cf::Index>(slot, &index_working_set_entry.index)?;
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        start.stop();
 | 
					        start.stop();
 | 
				
			||||||
        let commit_working_sets_elapsed = start.as_us();
 | 
					        let commit_working_sets_elapsed = start.as_us();
 | 
				
			||||||
@@ -535,6 +548,7 @@ impl Blocktree {
 | 
				
			|||||||
            write_batch_elapsed,
 | 
					            write_batch_elapsed,
 | 
				
			||||||
            num_inserted,
 | 
					            num_inserted,
 | 
				
			||||||
            num_recovered,
 | 
					            num_recovered,
 | 
				
			||||||
 | 
					            index_meta_time,
 | 
				
			||||||
        })
 | 
					        })
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -542,26 +556,29 @@ impl Blocktree {
 | 
				
			|||||||
        &self,
 | 
					        &self,
 | 
				
			||||||
        shred: Shred,
 | 
					        shred: Shred,
 | 
				
			||||||
        erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>,
 | 
					        erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>,
 | 
				
			||||||
        index_working_set: &mut HashMap<u64, Index>,
 | 
					        index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
 | 
				
			||||||
        write_batch: &mut WriteBatch,
 | 
					        write_batch: &mut WriteBatch,
 | 
				
			||||||
        just_inserted_coding_shreds: &mut HashMap<(u64, u64), Shred>,
 | 
					        just_inserted_coding_shreds: &mut HashMap<(u64, u64), Shred>,
 | 
				
			||||||
 | 
					        index_meta_time: &mut u64,
 | 
				
			||||||
    ) -> bool {
 | 
					    ) -> bool {
 | 
				
			||||||
        let slot = shred.slot();
 | 
					        let slot = shred.slot();
 | 
				
			||||||
        let shred_index = u64::from(shred.index());
 | 
					        let shred_index = u64::from(shred.index());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let (index_meta, mut new_index_meta) =
 | 
					        let index_meta_working_set_entry =
 | 
				
			||||||
            get_index_meta_entry(&self.db, slot, index_working_set);
 | 
					            get_index_meta_entry(&self.db, slot, index_working_set, index_meta_time);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let index_meta = index_meta.unwrap_or_else(|| new_index_meta.as_mut().unwrap());
 | 
					        let index_meta = &mut index_meta_working_set_entry.index;
 | 
				
			||||||
        // This gives the index of first coding shred in this FEC block
 | 
					        // This gives the index of first coding shred in this FEC block
 | 
				
			||||||
        // So, all coding shreds in a given FEC block will have the same set index
 | 
					        // So, all coding shreds in a given FEC block will have the same set index
 | 
				
			||||||
        if Blocktree::should_insert_coding_shred(&shred, index_meta.coding(), &self.last_root) {
 | 
					        if Blocktree::should_insert_coding_shred(&shred, index_meta.coding(), &self.last_root) {
 | 
				
			||||||
            self.insert_coding_shred(erasure_metas, index_meta, &shred, write_batch)
 | 
					            self.insert_coding_shred(erasure_metas, index_meta, &shred, write_batch)
 | 
				
			||||||
                .map(|_| {
 | 
					                .map(|_| {
 | 
				
			||||||
 | 
					                    // Insert was a success!
 | 
				
			||||||
                    just_inserted_coding_shreds
 | 
					                    just_inserted_coding_shreds
 | 
				
			||||||
                        .entry((slot, shred_index))
 | 
					                        .entry((slot, shred_index))
 | 
				
			||||||
                        .or_insert_with(|| shred);
 | 
					                        .or_insert_with(|| shred);
 | 
				
			||||||
                    new_index_meta.map(|n| index_working_set.insert(slot, n))
 | 
					
 | 
				
			||||||
 | 
					                    index_meta_working_set_entry.did_insert_occur = true;
 | 
				
			||||||
                })
 | 
					                })
 | 
				
			||||||
                .is_ok()
 | 
					                .is_ok()
 | 
				
			||||||
        } else {
 | 
					        } else {
 | 
				
			||||||
@@ -572,20 +589,23 @@ impl Blocktree {
 | 
				
			|||||||
    fn check_insert_data_shred(
 | 
					    fn check_insert_data_shred(
 | 
				
			||||||
        &self,
 | 
					        &self,
 | 
				
			||||||
        shred: Shred,
 | 
					        shred: Shred,
 | 
				
			||||||
        index_working_set: &mut HashMap<u64, Index>,
 | 
					        index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
 | 
				
			||||||
        slot_meta_working_set: &mut HashMap<u64, SlotMetaWorkingSetEntry>,
 | 
					        slot_meta_working_set: &mut HashMap<u64, SlotMetaWorkingSetEntry>,
 | 
				
			||||||
        write_batch: &mut WriteBatch,
 | 
					        write_batch: &mut WriteBatch,
 | 
				
			||||||
        just_inserted_data_shreds: &mut HashMap<(u64, u64), Shred>,
 | 
					        just_inserted_data_shreds: &mut HashMap<(u64, u64), Shred>,
 | 
				
			||||||
 | 
					        index_meta_time: &mut u64,
 | 
				
			||||||
    ) -> bool {
 | 
					    ) -> bool {
 | 
				
			||||||
        let slot = shred.slot();
 | 
					        let slot = shred.slot();
 | 
				
			||||||
        let shred_index = u64::from(shred.index());
 | 
					        let shred_index = u64::from(shred.index());
 | 
				
			||||||
        let (index_meta, mut new_index_meta) =
 | 
					
 | 
				
			||||||
            get_index_meta_entry(&self.db, slot, index_working_set);
 | 
					        let index_meta_working_set_entry =
 | 
				
			||||||
 | 
					            get_index_meta_entry(&self.db, slot, index_working_set, index_meta_time);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        let index_meta = &mut index_meta_working_set_entry.index;
 | 
				
			||||||
        let (slot_meta_entry, mut new_slot_meta_entry) =
 | 
					        let (slot_meta_entry, mut new_slot_meta_entry) =
 | 
				
			||||||
            get_slot_meta_entry(&self.db, slot_meta_working_set, slot, shred.parent());
 | 
					            get_slot_meta_entry(&self.db, slot_meta_working_set, slot, shred.parent());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let insert_success = {
 | 
					        let insert_success = {
 | 
				
			||||||
            let index_meta = index_meta.unwrap_or_else(|| new_index_meta.as_mut().unwrap());
 | 
					 | 
				
			||||||
            let entry = slot_meta_entry.unwrap_or_else(|| new_slot_meta_entry.as_mut().unwrap());
 | 
					            let entry = slot_meta_entry.unwrap_or_else(|| new_slot_meta_entry.as_mut().unwrap());
 | 
				
			||||||
            let mut slot_meta = entry.0.borrow_mut();
 | 
					            let mut slot_meta = entry.0.borrow_mut();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -602,7 +622,7 @@ impl Blocktree {
 | 
				
			|||||||
                    write_batch,
 | 
					                    write_batch,
 | 
				
			||||||
                ) {
 | 
					                ) {
 | 
				
			||||||
                    just_inserted_data_shreds.insert((slot, shred_index), shred);
 | 
					                    just_inserted_data_shreds.insert((slot, shred_index), shred);
 | 
				
			||||||
                    new_index_meta.map(|n| index_working_set.insert(slot, n));
 | 
					                    index_meta_working_set_entry.did_insert_occur = true;
 | 
				
			||||||
                    true
 | 
					                    true
 | 
				
			||||||
                } else {
 | 
					                } else {
 | 
				
			||||||
                    false
 | 
					                    false
 | 
				
			||||||
@@ -1314,22 +1334,24 @@ fn update_slot_meta(
 | 
				
			|||||||
fn get_index_meta_entry<'a>(
 | 
					fn get_index_meta_entry<'a>(
 | 
				
			||||||
    db: &Database,
 | 
					    db: &Database,
 | 
				
			||||||
    slot: u64,
 | 
					    slot: u64,
 | 
				
			||||||
    index_working_set: &'a mut HashMap<u64, Index>,
 | 
					    index_working_set: &'a mut HashMap<u64, IndexMetaWorkingSetEntry>,
 | 
				
			||||||
) -> (Option<&'a mut Index>, Option<Index>) {
 | 
					    index_meta_time: &mut u64,
 | 
				
			||||||
 | 
					) -> &'a mut IndexMetaWorkingSetEntry {
 | 
				
			||||||
    let index_cf = db.column::<cf::Index>();
 | 
					    let index_cf = db.column::<cf::Index>();
 | 
				
			||||||
    index_working_set
 | 
					    let mut total_start = Measure::start("Total elapsed");
 | 
				
			||||||
        .get_mut(&slot)
 | 
					    let res = index_working_set.entry(slot).or_insert_with(|| {
 | 
				
			||||||
        .map(|i| (Some(i), None))
 | 
					        let newly_inserted_meta = index_cf
 | 
				
			||||||
        .unwrap_or_else(|| {
 | 
					            .get(slot)
 | 
				
			||||||
            let newly_inserted_meta = Some(
 | 
					            .unwrap()
 | 
				
			||||||
                index_cf
 | 
					            .unwrap_or_else(|| Index::new(slot));
 | 
				
			||||||
                    .get(slot)
 | 
					        IndexMetaWorkingSetEntry {
 | 
				
			||||||
                    .unwrap()
 | 
					            index: newly_inserted_meta,
 | 
				
			||||||
                    .unwrap_or_else(|| Index::new(slot)),
 | 
					            did_insert_occur: false,
 | 
				
			||||||
            );
 | 
					        }
 | 
				
			||||||
 | 
					    });
 | 
				
			||||||
            (None, newly_inserted_meta)
 | 
					    total_start.stop();
 | 
				
			||||||
        })
 | 
					    *index_meta_time += total_start.as_us();
 | 
				
			||||||
 | 
					    res
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
fn get_slot_meta_entry<'a>(
 | 
					fn get_slot_meta_entry<'a>(
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user