diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index a7b98eb0b8..8c1d1ca6d7 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -547,6 +547,154 @@ impl Blockstore { Ok(slot_iterator.map(move |(rooted_slot, _)| rooted_slot)) } + fn get_recovery_data_shreds( + index: &mut Index, + set_index: u64, + slot: Slot, + erasure_meta: &ErasureMeta, + available_shreds: &mut Vec, + prev_inserted_datas: &mut HashMap<(u64, u64), Shred>, + data_cf: &LedgerColumn, + ) { + (set_index..set_index + erasure_meta.config.num_data() as u64).for_each(|i| { + if index.data().is_present(i) { + if let Some(shred) = prev_inserted_datas.remove(&(slot, i)).or_else(|| { + let some_data = data_cf + .get_bytes((slot, i)) + .expect("Database failure, could not fetch data shred"); + if let Some(data) = some_data { + Shred::new_from_serialized_shred(data).ok() + } else { + warn!("Data shred deleted while reading for recovery"); + None + } + }) { + available_shreds.push(shred); + } + } + }); + } + + fn get_recovery_coding_shreds( + index: &mut Index, + slot: Slot, + erasure_meta: &ErasureMeta, + available_shreds: &mut Vec, + prev_inserted_codes: &mut HashMap<(u64, u64), Shred>, + code_cf: &LedgerColumn, + ) { + (erasure_meta.first_coding_index + ..erasure_meta.first_coding_index + erasure_meta.config.num_coding() as u64) + .for_each(|i| { + if let Some(shred) = prev_inserted_codes + .remove(&(slot, i)) + .map(|s| { + // Remove from the index so it doesn't get committed. We know + // this is safe to do because everything in + // `prev_inserted_codes` does not yet exist in blockstore + // (guaranteed by `check_cache_coding_shred`) + index.coding_mut().set_present(i, false); + s + }) + .or_else(|| { + if index.coding().is_present(i) { + let some_code = code_cf + .get_bytes((slot, i)) + .expect("Database failure, could not fetch code shred"); + if let Some(code) = some_code { + Shred::new_from_serialized_shred(code).ok() + } else { + warn!("Code shred deleted while reading for recovery"); + None + } + } else { + None + } + }) + { + available_shreds.push(shred); + } + }); + } + + fn recover_shreds( + index: &mut Index, + set_index: u64, + erasure_meta: &ErasureMeta, + prev_inserted_datas: &mut HashMap<(u64, u64), Shred>, + prev_inserted_codes: &mut HashMap<(u64, u64), Shred>, + recovered_data_shreds: &mut Vec, + data_cf: &LedgerColumn, + code_cf: &LedgerColumn, + ) { + // Find shreds for this erasure set and try recovery + let slot = index.slot; + let mut available_shreds = vec![]; + + Self::get_recovery_data_shreds( + index, + set_index, + slot, + erasure_meta, + &mut available_shreds, + prev_inserted_datas, + data_cf, + ); + + Self::get_recovery_coding_shreds( + index, + slot, + erasure_meta, + &mut available_shreds, + prev_inserted_codes, + code_cf, + ); + + if let Ok(mut result) = Shredder::try_recovery( + available_shreds, + erasure_meta.config.num_data(), + erasure_meta.config.num_coding(), + set_index as usize, + erasure_meta.first_coding_index as usize, + slot, + ) { + Self::submit_metrics( + slot, + set_index, + erasure_meta, + true, + "complete".into(), + result.len(), + ); + recovered_data_shreds.append(&mut result); + } else { + Self::submit_metrics(slot, set_index, erasure_meta, true, "incomplete".into(), 0); + } + } + + fn submit_metrics( + slot: Slot, + set_index: u64, + erasure_meta: &ErasureMeta, + attempted: bool, + status: String, + recovered: usize, + ) { + datapoint_debug!( + "blockstore-erasure", + ("slot", slot as i64, i64), + ("start_index", set_index as i64, i64), + ( + "end_index", + (erasure_meta.set_index + erasure_meta.config.num_data() as u64) as i64, + i64 + ), + ("recovery_attempted", attempted, bool), + ("recovery_status", status, String), + ("recovered", recovered as i64, i64), + ); + } + fn try_shred_recovery( db: &Database, erasure_metas: &HashMap<(u64, u64), ErasureMeta>, @@ -563,94 +711,20 @@ impl Blockstore { // 3. Before trying recovery, check if enough number of shreds have been received // 3a. Enough number of shreds = (#data + #coding shreds) > erasure.num_data for (&(slot, set_index), erasure_meta) in erasure_metas.iter() { - let submit_metrics = |attempted: bool, status: String, recovered: usize| { - datapoint_debug!( - "blockstore-erasure", - ("slot", slot as i64, i64), - ("start_index", set_index as i64, i64), - ( - "end_index", - (erasure_meta.set_index + erasure_meta.config.num_data() as u64) as i64, - i64 - ), - ("recovery_attempted", attempted, bool), - ("recovery_status", status, String), - ("recovered", recovered as i64, i64), - ); - }; - let index_meta_entry = index_working_set.get_mut(&slot).expect("Index"); let index = &mut index_meta_entry.index; match erasure_meta.status(&index) { ErasureMetaStatus::CanRecover => { - // Find shreds for this erasure set and try recovery - let slot = index.slot; - let mut available_shreds = vec![]; - (set_index..set_index + erasure_meta.config.num_data() as u64).for_each(|i| { - if index.data().is_present(i) { - if let Some(shred) = - prev_inserted_datas.remove(&(slot, i)).or_else(|| { - let some_data = data_cf - .get_bytes((slot, i)) - .expect("Database failure, could not fetch data shred"); - if let Some(data) = some_data { - Shred::new_from_serialized_shred(data).ok() - } else { - warn!("Data shred deleted while reading for recovery"); - None - } - }) - { - available_shreds.push(shred); - } - } - }); - (erasure_meta.first_coding_index - ..erasure_meta.first_coding_index - + erasure_meta.config.num_coding() as u64) - .for_each(|i| { - if let Some(shred) = prev_inserted_codes - .remove(&(slot, i)) - .map(|s| { - // Remove from the index so it doesn't get committed. We know - // this is safe to do because everything in - // `prev_inserted_codes` does not yet exist in blockstore - // (guaranteed by `check_cache_coding_shred`) - index.coding_mut().set_present(i, false); - s - }) - .or_else(|| { - if index.coding().is_present(i) { - let some_code = code_cf - .get_bytes((slot, i)) - .expect("Database failure, could not fetch code shred"); - if let Some(code) = some_code { - Shred::new_from_serialized_shred(code).ok() - } else { - warn!("Code shred deleted while reading for recovery"); - None - } - } else { - None - } - }) - { - available_shreds.push(shred); - } - }); - if let Ok(mut result) = Shredder::try_recovery( - available_shreds, - erasure_meta.config.num_data(), - erasure_meta.config.num_coding(), - set_index as usize, - erasure_meta.first_coding_index as usize, - slot, - ) { - submit_metrics(true, "complete".into(), result.len()); - recovered_data_shreds.append(&mut result); - } else { - submit_metrics(true, "incomplete".into(), 0); - } + Self::recover_shreds( + index, + set_index, + erasure_meta, + prev_inserted_datas, + prev_inserted_codes, + &mut recovered_data_shreds, + &data_cf, + &code_cf, + ); } ErasureMetaStatus::DataFull => { (set_index..set_index + erasure_meta.config.num_coding() as u64).for_each( @@ -665,10 +739,24 @@ impl Blockstore { } }, ); - submit_metrics(false, "complete".into(), 0); + Self::submit_metrics( + slot, + set_index, + erasure_meta, + false, + "complete".into(), + 0, + ); } ErasureMetaStatus::StillNeed(needed) => { - submit_metrics(false, format!("still need: {}", needed), 0); + Self::submit_metrics( + slot, + set_index, + erasure_meta, + false, + format!("still need: {}", needed), + 0, + ); } }; }