diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index b0e143e0db..49f6db56c8 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -2121,6 +2121,165 @@ pub mod tests { Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); } + #[test] + fn test_find_missing_data_indexes() { + let slot = 0; + let blocktree_path = get_tmp_ledger_path!(); + let blocktree = Blocktree::open(&blocktree_path).unwrap(); + + // Write entries + let gap = 10; + assert!(gap > 3); + let num_entries = 10; + let mut blobs = make_tiny_test_entries(num_entries).to_single_entry_blobs(); + for (i, b) in blobs.iter_mut().enumerate() { + b.set_index(i as u64 * gap); + b.set_slot(slot); + } + blocktree.write_blobs(&blobs).unwrap(); + + // Index of the first blob is 0 + // Index of the second blob is "gap" + // Thus, the missing indexes should then be [1, gap - 1] for the input index + // range of [0, gap) + let expected: Vec = (1..gap).collect(); + assert_eq!( + blocktree.find_missing_data_indexes(slot, 0, gap, gap as usize), + expected + ); + assert_eq!( + blocktree.find_missing_data_indexes(slot, 1, gap, (gap - 1) as usize), + expected, + ); + assert_eq!( + blocktree.find_missing_data_indexes(slot, 0, gap - 1, (gap - 1) as usize), + &expected[..expected.len() - 1], + ); + assert_eq!( + blocktree.find_missing_data_indexes(slot, gap - 2, gap, gap as usize), + vec![gap - 2, gap - 1], + ); + assert_eq!( + blocktree.find_missing_data_indexes(slot, gap - 2, gap, 1), + vec![gap - 2], + ); + assert_eq!( + blocktree.find_missing_data_indexes(slot, 0, gap, 1), + vec![1], + ); + + // Test with end indexes that are greater than the last item in the ledger + let mut expected: Vec = (1..gap).collect(); + expected.push(gap + 1); + assert_eq!( + blocktree.find_missing_data_indexes(slot, 0, gap + 2, (gap + 2) as usize), + expected, + ); + assert_eq!( + blocktree.find_missing_data_indexes(slot, 0, gap + 2, (gap - 1) as usize), + &expected[..expected.len() - 1], + ); + + for i in 0..num_entries as u64 { + for j in 0..i { + let expected: Vec = (j..i) + .flat_map(|k| { + let begin = k * gap + 1; + let end = (k + 1) * gap; + (begin..end) + }) + .collect(); + assert_eq!( + blocktree.find_missing_data_indexes( + slot, + j * gap, + i * gap, + ((i - j) * gap) as usize + ), + expected, + ); + } + } + + drop(blocktree); + Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); + } + + #[test] + fn test_find_missing_data_indexes_sanity() { + let slot = 0; + + let blocktree_path = get_tmp_ledger_path!(); + let blocktree = Blocktree::open(&blocktree_path).unwrap(); + + // Early exit conditions + let empty: Vec = vec![]; + assert_eq!(blocktree.find_missing_data_indexes(slot, 0, 0, 1), empty); + assert_eq!(blocktree.find_missing_data_indexes(slot, 5, 5, 1), empty); + assert_eq!(blocktree.find_missing_data_indexes(slot, 4, 3, 1), empty); + assert_eq!(blocktree.find_missing_data_indexes(slot, 1, 2, 0), empty); + + let mut blobs = make_tiny_test_entries(2).to_single_entry_blobs(); + + const ONE: u64 = 1; + const OTHER: u64 = 4; + + blobs[0].set_index(ONE); + blobs[1].set_index(OTHER); + + // Insert one blob at index = first_index + blocktree.write_blobs(&blobs).unwrap(); + + const STARTS: u64 = OTHER * 2; + const END: u64 = OTHER * 3; + const MAX: usize = 10; + // The first blob has index = first_index. Thus, for i < first_index, + // given the input range of [i, first_index], the missing indexes should be + // [i, first_index - 1] + for start in 0..STARTS { + let result = blocktree.find_missing_data_indexes( + slot, start, // start + END, //end + MAX, //max + ); + let expected: Vec = (start..END).filter(|i| *i != ONE && *i != OTHER).collect(); + assert_eq!(result, expected); + } + + drop(blocktree); + Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); + } + + #[test] + pub fn test_no_missing_blob_indexes() { + let slot = 0; + let blocktree_path = get_tmp_ledger_path!(); + let blocktree = Blocktree::open(&blocktree_path).unwrap(); + + // Write entries + let num_entries = 10; + let shared_blobs = make_tiny_test_entries(num_entries).to_single_entry_shared_blobs(); + + crate::packet::index_blobs(&shared_blobs, &Keypair::new().pubkey(), 0, slot, 0); + + let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); + let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); + blocktree.write_blobs(blobs).unwrap(); + + let empty: Vec = vec![]; + for i in 0..num_entries as u64 { + for j in 0..i { + assert_eq!( + blocktree.find_missing_data_indexes(slot, j, i, (i - j) as usize), + empty + ); + } + } + + drop(blocktree); + Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); + } + pub fn entries_to_blobs( entries: &Vec, slot: u64, diff --git a/core/src/db_window.rs b/core/src/db_window.rs index d6a7330107..ea6475666f 100644 --- a/core/src/db_window.rs +++ b/core/src/db_window.rs @@ -1,7 +1,5 @@ //! Set of functions for emulating windowing functions from a database ledger implementation use crate::blocktree::*; -#[cfg(feature = "erasure")] -use crate::erasure; use crate::packet::{SharedBlob, BLOB_HEADER_SIZE}; use crate::result::Result; use crate::streamer::BlobSender; @@ -49,346 +47,18 @@ pub fn process_blob(blocktree: &Arc, blob: &SharedBlob) -> Result<()> blocktree.insert_data_blobs(vec![(*blob.read().unwrap()).borrow()])?; } - #[cfg(feature = "erasure")] - { - // TODO: Support per-slot erasure. Issue: https://github.com/solana-labs/solana/issues/2441 - if let Err(e) = try_erasure(blocktree, 0) { - trace!( - "erasure::recover failed to write recovered coding blobs. Err: {:?}", - e - ); - } - } - Ok(()) } -#[cfg(feature = "erasure")] -fn try_erasure(blocktree: &Arc, slot_index: u64) -> Result<()> { - let meta = blocktree.meta(slot_index)?; - - if let Some(meta) = meta { - let (data, coding) = erasure::recover(blocktree, slot_index, meta.consumed)?; - for c in coding { - let c = c.read().unwrap(); - blocktree.put_coding_blob_bytes( - 0, - c.index(), - &c.data[..BLOB_HEADER_SIZE + c.size()], - )?; - } - - blocktree.write_shared_blobs(data) - } else { - Ok(()) - } -} - #[cfg(test)] mod test { use super::*; use crate::blocktree::get_tmp_ledger_path; - #[cfg(all(feature = "erasure", test))] - use crate::entry::reconstruct_entries_from_blobs; use crate::entry::{make_tiny_test_entries, EntrySlice}; - #[cfg(all(feature = "erasure", test))] - use crate::erasure::test::{generate_blocktree_from_window, setup_window_ledger}; - #[cfg(all(feature = "erasure", test))] - use crate::erasure::{NUM_CODING, NUM_DATA}; - use crate::packet::{index_blobs, Blob}; + use crate::packet::index_blobs; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::sync::Arc; - #[test] - pub fn test_find_missing_data_indexes_sanity() { - let slot = 0; - - let blocktree_path = get_tmp_ledger_path!(); - let blocktree = Blocktree::open(&blocktree_path).unwrap(); - - // Early exit conditions - let empty: Vec = vec![]; - assert_eq!(blocktree.find_missing_data_indexes(slot, 0, 0, 1), empty); - assert_eq!(blocktree.find_missing_data_indexes(slot, 5, 5, 1), empty); - assert_eq!(blocktree.find_missing_data_indexes(slot, 4, 3, 1), empty); - assert_eq!(blocktree.find_missing_data_indexes(slot, 1, 2, 0), empty); - - let mut blobs = make_tiny_test_entries(2).to_single_entry_blobs(); - - const ONE: u64 = 1; - const OTHER: u64 = 4; - - blobs[0].set_index(ONE); - blobs[1].set_index(OTHER); - - // Insert one blob at index = first_index - blocktree.write_blobs(&blobs).unwrap(); - - const STARTS: u64 = OTHER * 2; - const END: u64 = OTHER * 3; - const MAX: usize = 10; - // The first blob has index = first_index. Thus, for i < first_index, - // given the input range of [i, first_index], the missing indexes should be - // [i, first_index - 1] - for start in 0..STARTS { - let result = blocktree.find_missing_data_indexes( - slot, start, // start - END, //end - MAX, //max - ); - let expected: Vec = (start..END).filter(|i| *i != ONE && *i != OTHER).collect(); - assert_eq!(result, expected); - } - - drop(blocktree); - Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); - } - - #[test] - pub fn test_find_missing_data_indexes() { - let slot = 0; - let blocktree_path = get_tmp_ledger_path!(); - let blocktree = Blocktree::open(&blocktree_path).unwrap(); - - // Write entries - let gap = 10; - assert!(gap > 3); - let num_entries = 10; - let mut blobs = make_tiny_test_entries(num_entries).to_single_entry_blobs(); - for (i, b) in blobs.iter_mut().enumerate() { - b.set_index(i as u64 * gap); - b.set_slot(slot); - } - blocktree.write_blobs(&blobs).unwrap(); - - // Index of the first blob is 0 - // Index of the second blob is "gap" - // Thus, the missing indexes should then be [1, gap - 1] for the input index - // range of [0, gap) - let expected: Vec = (1..gap).collect(); - assert_eq!( - blocktree.find_missing_data_indexes(slot, 0, gap, gap as usize), - expected - ); - assert_eq!( - blocktree.find_missing_data_indexes(slot, 1, gap, (gap - 1) as usize), - expected, - ); - assert_eq!( - blocktree.find_missing_data_indexes(slot, 0, gap - 1, (gap - 1) as usize), - &expected[..expected.len() - 1], - ); - assert_eq!( - blocktree.find_missing_data_indexes(slot, gap - 2, gap, gap as usize), - vec![gap - 2, gap - 1], - ); - assert_eq!( - blocktree.find_missing_data_indexes(slot, gap - 2, gap, 1), - vec![gap - 2], - ); - assert_eq!( - blocktree.find_missing_data_indexes(slot, 0, gap, 1), - vec![1], - ); - - // Test with end indexes that are greater than the last item in the ledger - let mut expected: Vec = (1..gap).collect(); - expected.push(gap + 1); - assert_eq!( - blocktree.find_missing_data_indexes(slot, 0, gap + 2, (gap + 2) as usize), - expected, - ); - assert_eq!( - blocktree.find_missing_data_indexes(slot, 0, gap + 2, (gap - 1) as usize), - &expected[..expected.len() - 1], - ); - - for i in 0..num_entries as u64 { - for j in 0..i { - let expected: Vec = (j..i) - .flat_map(|k| { - let begin = k * gap + 1; - let end = (k + 1) * gap; - (begin..end) - }) - .collect(); - assert_eq!( - blocktree.find_missing_data_indexes( - slot, - j * gap, - i * gap, - ((i - j) * gap) as usize - ), - expected, - ); - } - } - - drop(blocktree); - Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); - } - - #[test] - pub fn test_find_missing_data_indexes_slots() { - let blocktree_path = get_tmp_ledger_path!(); - let blocktree = Blocktree::open(&blocktree_path).unwrap(); - - let num_entries_per_slot = 10; - let num_slots = 2; - let mut blobs = - make_tiny_test_entries(num_slots * num_entries_per_slot).to_single_entry_blobs(); - - // Insert every nth entry for each slot - let nth = 3; - for (i, b) in blobs.iter_mut().enumerate() { - b.set_index(((i % num_entries_per_slot) * nth) as u64); - b.set_slot((i / num_entries_per_slot) as u64); - } - - blocktree.write_blobs(&blobs).unwrap(); - - let mut expected: Vec = (0..num_entries_per_slot) - .flat_map(|x| ((nth * x + 1) as u64..(nth * x + nth) as u64)) - .collect(); - - // For each slot, find all missing indexes in the range [0, num_entries_per_slot * nth] - for slot in 0..num_slots { - assert_eq!( - blocktree.find_missing_data_indexes( - slot as u64, - 0, - (num_entries_per_slot * nth) as u64, - num_entries_per_slot * nth as usize - ), - expected, - ); - } - - // Test with a limit on the number of returned entries - for slot in 0..num_slots { - assert_eq!( - blocktree.find_missing_data_indexes( - slot as u64, - 0, - (num_entries_per_slot * nth) as u64, - num_entries_per_slot * (nth - 1) - )[..], - expected[..num_entries_per_slot * (nth - 1)], - ); - } - - // Try to find entries in the range [num_entries_per_slot * nth..num_entries_per_slot * (nth + 1) - // that don't exist in the ledger. - let extra_entries = - (num_entries_per_slot * nth) as u64..(num_entries_per_slot * (nth + 1)) as u64; - expected.extend(extra_entries); - - // For each slot, find all missing indexes in the range [0, num_entries_per_slot * nth] - for slot in 0..num_slots { - assert_eq!( - blocktree.find_missing_data_indexes( - slot as u64, - 0, - (num_entries_per_slot * (nth + 1)) as u64, - num_entries_per_slot * (nth + 1), - ), - expected, - ); - } - } - - #[test] - pub fn test_no_missing_blob_indexes() { - let slot = 0; - let blocktree_path = get_tmp_ledger_path!(); - let blocktree = Blocktree::open(&blocktree_path).unwrap(); - - // Write entries - let num_entries = 10; - let shared_blobs = make_tiny_test_entries(num_entries).to_single_entry_shared_blobs(); - - index_blobs(&shared_blobs, &Keypair::new().pubkey(), 0, slot, 0); - - let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); - let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); - blocktree.write_blobs(blobs).unwrap(); - - let empty: Vec = vec![]; - for i in 0..num_entries as u64 { - for j in 0..i { - assert_eq!( - blocktree.find_missing_data_indexes(slot, j, i, (i - j) as usize), - empty - ); - } - } - - drop(blocktree); - Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); - } - - #[cfg(all(feature = "erasure", test))] - #[test] - pub fn test_try_erasure() { - // Setup the window - let offset = 0; - let num_blobs = NUM_DATA + 2; - let slot = 0; - let mut window = setup_window_ledger(offset, num_blobs, false, slot); - let end_index = (offset + num_blobs) % window.len(); - - // Test erasing a data block and an erasure block - let coding_start = offset - (offset % NUM_DATA) + (NUM_DATA - NUM_CODING); - - let erased_index = coding_start % window.len(); - - // Create a hole in the window - let erased_data = window[erased_index].data.clone(); - let erased_coding = window[erased_index].coding.clone().unwrap(); - window[erased_index].data = None; - window[erased_index].coding = None; - - // Generate the blocktree from the window - let ledger_path = get_tmp_ledger_path!(); - let blocktree = Arc::new(generate_blocktree_from_window(&ledger_path, &window, false)); - - try_erasure(&blocktree, 0).expect("Expected successful erasure attempt"); - window[erased_index].data = erased_data; - - { - let data_blobs: Vec<_> = window[erased_index..end_index] - .iter() - .map(|entry| entry.data.clone().unwrap()) - .collect(); - - let locks: Vec<_> = data_blobs.iter().map(|blob| blob.read().unwrap()).collect(); - - let locked_data: Vec<&Blob> = locks.iter().map(|lock| &**lock).collect(); - - let (expected, _) = reconstruct_entries_from_blobs(locked_data).unwrap(); - - assert_eq!( - blocktree - .get_slot_entries( - 0, - erased_index as u64, - Some((end_index - erased_index) as u64) - ) - .unwrap(), - expected - ); - } - - let erased_coding_l = erased_coding.read().unwrap(); - assert_eq!( - &blocktree - .get_coding_blob_bytes(slot, erased_index as u64) - .unwrap() - .unwrap()[BLOB_HEADER_SIZE..], - &erased_coding_l.data()[..erased_coding_l.size() as usize], - ); - } - #[test] fn test_process_blob() { let blocktree_path = get_tmp_ledger_path!(); diff --git a/core/src/erasure.rs b/core/src/erasure.rs index b530283ee1..9b84dbbfdf 100644 --- a/core/src/erasure.rs +++ b/core/src/erasure.rs @@ -362,7 +362,7 @@ impl CodingGenerator { // Recover the missing data and coding blobs from the input ledger. Returns a vector // of the recovered missing data blobs and a vector of the recovered coding blobs pub fn recover( - blocktree: &Arc, + blocktree: &Blocktree, slot: u64, start_idx: u64, ) -> Result<(Vec, Vec)> { @@ -498,23 +498,31 @@ fn categorize_blob( #[cfg(test)] pub mod test { - #[derive(Default, Clone)] - pub struct WindowSlot { - pub data: Option, - pub coding: Option, - pub leader_unknown: bool, - } - use super::*; use crate::blocktree::get_tmp_ledger_path; use crate::blocktree::Blocktree; use crate::entry::{make_tiny_test_entries, EntrySlice}; - use crate::packet::{index_blobs, SharedBlob, BLOB_DATA_SIZE, BLOB_SIZE}; - use rand::{thread_rng, Rng}; - use solana_sdk::pubkey::Pubkey; + use crate::packet::{index_blobs, SharedBlob}; use solana_sdk::signature::{Keypair, KeypairUtil}; - use std::sync::Arc; + + /// Specifies the contents of a 16-data-blob and 4-coding-blob erasure set + /// Exists to be passed to `generate_blocktree_with_coding` + #[derive(Debug, Copy, Clone)] + pub struct ErasureSpec { + /// Which 16-blob erasure set this represents + pub set_index: usize, + pub num_data: usize, + pub num_coding: usize, + } + + /// Specifies the contents of a slot + /// Exists to be passed to `generate_blocktree_with_coding` + #[derive(Debug, Clone)] + pub struct SlotSpec { + pub slot: u64, + pub set_specs: Vec, + } #[test] fn test_coding() { @@ -634,278 +642,192 @@ pub mod test { } } - // TODO: Temprorary function used in tests to generate a database ledger - // from the window (which is used to generate the erasure coding) - // until we also transition generate_coding() and BroadcastStage to use Blocktree - // Github issue: https://github.com/solana-labs/solana/issues/1899. - pub fn generate_blocktree_from_window( - ledger_path: &str, - window: &[WindowSlot], - use_random: bool, - ) -> Blocktree { - let blocktree = - Blocktree::open(ledger_path).expect("Expected to be able to open database ledger"); - for slot in window { - if let Some(ref data) = slot.data { - // If we're using gibberish blobs, skip validation checks and insert - // directly into the ledger - if use_random { - let data = data.read().unwrap(); - blocktree - .put_data_blob_bytes( - data.slot(), - data.index(), - &data.data[..data.data_size() as usize], - ) - .expect("Expected successful put into data column of ledger"); - } else { - blocktree - .write_shared_blobs(vec![data].into_iter()) - .unwrap(); + #[test] + fn test_generate_blocktree_with_coding() { + let cases = vec![ + (NUM_DATA, NUM_CODING, 7, 5), + (NUM_DATA - 6, NUM_CODING - 1, 5, 7), + ]; + + for (num_data, num_coding, num_slots, num_sets_per_slot) in cases { + let ledger_path = get_tmp_ledger_path!(); + + let specs = (0..num_slots) + .map(|slot| { + let set_specs = (0..num_sets_per_slot) + .map(|set_index| ErasureSpec { + set_index, + num_data, + num_coding, + }) + .collect(); + + SlotSpec { slot, set_specs } + }) + .collect::>(); + + let blocktree = generate_blocktree_with_coding(&ledger_path, &specs); + + for spec in specs.iter() { + let slot = spec.slot; + + for erasure_spec in spec.set_specs.iter() { + let set_index = erasure_spec.set_index as u64; + let start_index = set_index * NUM_DATA as u64; + + for i in 0..erasure_spec.num_data as u64 { + let opt_bytes = blocktree + .get_data_blob_bytes(slot, start_index + i) + .unwrap(); + assert!(opt_bytes.is_some()); + } + + for i in 0..erasure_spec.num_coding as u64 { + let coding_start_index = start_index as usize + (NUM_DATA - NUM_CODING); + let opt_bytes = blocktree + .get_coding_blob_bytes(slot, coding_start_index as u64 + i) + .unwrap(); + assert!(opt_bytes.is_some()); + } } } - if let Some(ref coding) = slot.coding { - let coding_lock = coding.read().unwrap(); + drop(blocktree); + Blocktree::destroy(&ledger_path).expect("Expect successful blocktree destruction"); + } + } - let index = coding_lock.index(); + #[test] + fn test_blocktree_recover_basic() { + let ledger_path = get_tmp_ledger_path!(); - let data_size = coding_lock.size(); + // Missing 1 data blob + let spec = SlotSpec { + slot: 0, + set_specs: vec![ErasureSpec { + set_index: 0, + num_data: NUM_DATA - 1, + num_coding: 4, + }], + }; - blocktree - .put_coding_blob_bytes( - coding_lock.slot(), - index, - &coding_lock.data[..data_size as usize + BLOB_HEADER_SIZE], - ) - .unwrap(); + let blocktree = generate_blocktree_with_coding(&ledger_path, &[spec]); + + let (recovered_data, recovered_coding) = + recover(&blocktree, 0, 0).expect("Expect successful recovery"); + + assert!(recovered_coding.is_empty()); + + assert!(recovered_data.len() == 1); + + drop(blocktree); + Blocktree::destroy(&ledger_path).expect("Expect successful blocktree destruction"); + } + + #[test] + fn test_blocktree_recover_basic2() { + let ledger_path = get_tmp_ledger_path!(); + + // Missing 1 data blob in [0, 16) + // [16..32) complete + let spec1 = SlotSpec { + slot: 0, + set_specs: vec![ + ErasureSpec { + set_index: 0, + num_data: NUM_DATA - 1, + num_coding: NUM_CODING, + }, + ErasureSpec { + set_index: 1, + num_data: NUM_DATA, + num_coding: NUM_CODING, + }, + ], + }; + + // Missing 1 coding and 1 data blbo + let spec2 = SlotSpec { + slot: 3, + set_specs: vec![ErasureSpec { + set_index: 3, + num_data: NUM_DATA - 1, + num_coding: NUM_CODING - 1, + }], + }; + + let blocktree = generate_blocktree_with_coding(&ledger_path, &[spec1, spec2]); + + let (recovered_data, recovered_coding) = + recover(&blocktree, 0, 0).expect("Expect successful recovery"); + + assert!(recovered_coding.is_empty()); + assert_eq!(recovered_data.len(), 1); + + let (recovered_data, recovered_coding) = + recover(&blocktree, 0, NUM_DATA as u64).expect("Expect successful recovery"); + + assert!(recovered_coding.is_empty()); + assert!(recovered_data.is_empty()); + + let (recovered_data, recovered_coding) = + recover(&blocktree, 3, 3 * NUM_DATA as u64).expect("Expect successful recovery"); + + assert_eq!(recovered_coding.len(), 1); + assert_eq!(recovered_data.len(), 1); + + drop(blocktree); + Blocktree::destroy(&ledger_path).expect("Expect successful blocktree destruction"); + } + + /// Genarates a ledger according to the given specs. Does not generate a valid ledger with + /// chaining and etc. + pub fn generate_blocktree_with_coding(ledger_path: &str, specs: &[SlotSpec]) -> Blocktree { + let blocktree = Blocktree::open(ledger_path).unwrap(); + + for spec in specs { + let slot = spec.slot; + + for erasure_spec in spec.set_specs.iter() { + let set_index = erasure_spec.set_index as usize; + let start_index = set_index * NUM_DATA; + + let mut blobs = make_tiny_test_entries(NUM_DATA).to_single_entry_shared_blobs(); + index_blobs( + &blobs, + &Keypair::new().pubkey(), + start_index as u64, + slot, + 0, + ); + + let mut coding_generator = CodingGenerator::new(); + let mut coding_blobs = coding_generator.next(&blobs).unwrap(); + + blobs.drain(erasure_spec.num_data..); + coding_blobs.drain(erasure_spec.num_coding..); + + for shared_blob in blobs { + let blob = shared_blob.read().unwrap(); + let size = blob.size() as usize + BLOB_HEADER_SIZE; + blocktree + .put_data_blob_bytes(blob.slot(), blob.index(), &blob.data[..size]) + .unwrap(); + } + + for shared_blob in coding_blobs { + let blob = shared_blob.read().unwrap(); + let size = blob.size() as usize + BLOB_HEADER_SIZE; + blocktree + .put_coding_blob_bytes(blob.slot(), blob.index(), &blob.data[..size]) + .unwrap(); + } } } blocktree } - fn generate_coding( - id: &Pubkey, - window: &mut [WindowSlot], - receive_index: u64, - num_blobs: usize, - transmit_index_coding: &mut u64, - ) -> Result<()> { - // beginning of the coding blobs of the block that receive_index points into - let coding_index_start = - receive_index - (receive_index % NUM_DATA as u64) + (NUM_DATA - NUM_CODING) as u64; - - let start_idx = receive_index as usize % window.len(); - let mut block_start = start_idx - (start_idx % NUM_DATA); - - loop { - let block_end = block_start + NUM_DATA; - if block_end > (start_idx + num_blobs) { - break; - } - info!( - "generate_coding {} start: {} end: {} start_idx: {} num_blobs: {}", - id, block_start, block_end, start_idx, num_blobs - ); - - let mut max_data_size = 0; - - // find max_data_size, maybe bail if not all the data is here - for i in block_start..block_end { - let n = i % window.len(); - trace!("{} window[{}] = {:?}", id, n, window[n].data); - - if let Some(b) = &window[n].data { - max_data_size = cmp::max(b.read().unwrap().meta.size, max_data_size); - } else { - trace!("{} data block is null @ {}", id, n); - return Ok(()); - } - } - - // round up to the nearest jerasure alignment - max_data_size = align!(max_data_size, JERASURE_ALIGN); - - let mut data_blobs = Vec::with_capacity(NUM_DATA); - for i in block_start..block_end { - let n = i % window.len(); - - if let Some(b) = &window[n].data { - // make sure extra bytes in each blob are zero-d out for generation of - // coding blobs - let mut b_wl = b.write().unwrap(); - for i in b_wl.meta.size..max_data_size { - b_wl.data[i] = 0; - } - data_blobs.push(b); - } - } - - // getting ready to do erasure coding, means that we're potentially - // going back in time, tell our caller we've inserted coding blocks - // starting at coding_index_start - *transmit_index_coding = cmp::min(*transmit_index_coding, coding_index_start); - - let mut coding_blobs = Vec::with_capacity(NUM_CODING); - let coding_start = block_end - NUM_CODING; - for i in coding_start..block_end { - let n = i % window.len(); - assert!(window[n].coding.is_none()); - - window[n].coding = Some(SharedBlob::default()); - - let coding = window[n].coding.clone().unwrap(); - let mut coding_wl = coding.write().unwrap(); - for i in 0..max_data_size { - coding_wl.data[i] = 0; - } - // copy index and forward flag from the data blob - if let Some(data) = &window[n].data { - let data_rl = data.read().unwrap(); - - let index = data_rl.index(); - let slot = data_rl.slot(); - let id = data_rl.id(); - let should_forward = data_rl.should_forward(); - - trace!( - "{} copying index {} should_forward {:?} from data to coding", - should_forward, - index, - should_forward - ); - coding_wl.set_index(index); - coding_wl.set_slot(slot); - coding_wl.set_id(&id); - coding_wl.forward(should_forward); - } - coding_wl.set_size(max_data_size); - coding_wl.set_coding(); - - coding_blobs.push(coding.clone()); - } - - let data_locks: Vec<_> = data_blobs.iter().map(|b| b.read().unwrap()).collect(); - - let data_ptrs: Vec<_> = data_locks - .iter() - .enumerate() - .map(|(i, l)| { - trace!("{} i: {} data: {}", id, i, l.data[0]); - &l.data[..max_data_size] - }) - .collect(); - - let mut coding_locks: Vec<_> = - coding_blobs.iter().map(|b| b.write().unwrap()).collect(); - - let mut coding_ptrs: Vec<_> = coding_locks - .iter_mut() - .enumerate() - .map(|(i, l)| { - trace!("{} i: {} coding: {}", id, i, l.data[0],); - &mut l.data_mut()[..max_data_size] - }) - .collect(); - - generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?; - debug!( - "{} start_idx: {} data: {}:{} coding: {}:{}", - id, start_idx, block_start, block_end, coding_start, block_end - ); - block_start = block_end; - } - Ok(()) - } - - pub fn setup_window_ledger( - offset: usize, - num_blobs: usize, - use_random_window: bool, - slot: u64, - ) -> Vec { - // Generate a window - let mut window = { - if use_random_window { - generate_window(offset, num_blobs, slot) - } else { - generate_entry_window(offset, num_blobs) - } - }; - - for slot in &window { - if let Some(blob) = &slot.data { - let blob_r = blob.read().unwrap(); - assert!(!blob_r.is_coding()); - } - } - - // Generate the coding blocks - let mut index = (NUM_DATA + 2) as u64; - assert!(generate_coding( - &Pubkey::default(), - &mut window, - offset as u64, - num_blobs, - &mut index - ) - .is_ok()); - assert_eq!(index, (NUM_DATA - NUM_CODING) as u64); - - // put junk in the tails, simulates re-used blobs - scramble_window_tails(&mut window, num_blobs); - - window - } - - const WINDOW_SIZE: usize = 64; - fn generate_window(offset: usize, num_blobs: usize, slot: u64) -> Vec { - let mut window = vec![ - WindowSlot { - data: None, - coding: None, - leader_unknown: false, - }; - WINDOW_SIZE - ]; - let mut blobs = Vec::with_capacity(num_blobs); - for i in 0..num_blobs { - let b = SharedBlob::default(); - let b_ = b.clone(); - let mut w = b.write().unwrap(); - // generate a random length, multiple of 4 between 8 and 32 - let data_len = if i == 3 { - BLOB_DATA_SIZE - } else { - (thread_rng().gen_range(2, 8) * 4) + 1 - }; - - w.set_size(data_len); - - for k in 0..data_len { - w.data_mut()[k] = (k + i) as u8; - } - - // overfill, simulates re-used blobs - for i in BLOB_HEADER_SIZE + data_len..BLOB_SIZE { - w.data[i] = thread_rng().gen(); - } - - blobs.push(b_); - } - - // Make some dummy slots - index_blobs(&blobs, &Keypair::new().pubkey(), offset as u64, slot, 0); - - for b in blobs { - let idx = b.read().unwrap().index() as usize % WINDOW_SIZE; - - window[idx].data = Some(b); - } - window - } - fn generate_test_blobs(offset: usize, num_blobs: usize) -> Vec { let blobs = make_tiny_test_entries(num_blobs).to_single_entry_shared_blobs(); @@ -913,153 +835,4 @@ pub mod test { blobs } - fn generate_entry_window(offset: usize, num_blobs: usize) -> Vec { - let mut window = vec![ - WindowSlot { - data: None, - coding: None, - leader_unknown: false, - }; - WINDOW_SIZE - ]; - - let blobs = generate_test_blobs(offset, num_blobs); - - for b in blobs.into_iter() { - let idx = b.read().unwrap().index() as usize % WINDOW_SIZE; - - window[idx].data = Some(b); - } - window - } - - fn scramble_window_tails(window: &mut [WindowSlot], num_blobs: usize) { - for i in 0..num_blobs { - if let Some(b) = &window[i].data { - let size = { - let b_l = b.read().unwrap(); - b_l.meta.size - } as usize; - - let mut b_l = b.write().unwrap(); - for i in size..BLOB_SIZE { - b_l.data[i] = thread_rng().gen(); - } - } - } - } - - // Remove a data block, test for successful recovery - #[test] - pub fn test_window_recover_basic() { - solana_logger::setup(); - - // Setup the window - let offset = 0; - let num_blobs = NUM_DATA + 2; - let mut window = setup_window_ledger(offset, num_blobs, true, 0); - - // Test erasing a data block - let erase_offset = offset % window.len(); - - // Create a hole in the window - let refwindow = window[erase_offset].data.clone(); - window[erase_offset].data = None; - - // Generate the blocktree from the window - let ledger_path = get_tmp_ledger_path!(); - let blocktree = Arc::new(generate_blocktree_from_window(&ledger_path, &window, true)); - - // Recover it from coding - let (recovered_data, recovered_coding) = recover(&blocktree, 0, offset as u64) - .expect("Expected successful recovery of erased blobs"); - - assert!(recovered_coding.is_empty()); - { - // Check the result, block is here to drop locks - let recovered_blob = recovered_data - .first() - .expect("Expected recovered data blob to exist"); - let ref_l = refwindow.clone().unwrap(); - let ref_l2 = ref_l.read().unwrap(); - let result = recovered_blob.read().unwrap(); - - assert_eq!(result.size(), ref_l2.size()); - assert_eq!( - result.data[..ref_l2.data_size() as usize], - ref_l2.data[..ref_l2.data_size() as usize] - ); - assert_eq!(result.index(), offset as u64); - assert_eq!(result.slot(), 0 as u64); - } - drop(blocktree); - Blocktree::destroy(&ledger_path) - .expect("Expected successful destruction of database ledger"); - } - - // Remove a data and coding block, test for successful recovery - #[test] - pub fn test_window_recover_basic2() { - solana_logger::setup(); - - // Setup the window - let offset = 0; - let num_blobs = NUM_DATA + 2; - let mut window = setup_window_ledger(offset, num_blobs, true, 0); - - // Tests erasing a coding block and a data block - let coding_start = offset - (offset % NUM_DATA) + (NUM_DATA - NUM_CODING); - let erase_offset = coding_start % window.len(); - - // Create a hole in the window - let refwindowdata = window[erase_offset].data.clone(); - let refwindowcoding = window[erase_offset].coding.clone(); - window[erase_offset].data = None; - window[erase_offset].coding = None; - let ledger_path = get_tmp_ledger_path!(); - let blocktree = Arc::new(generate_blocktree_from_window(&ledger_path, &window, true)); - - // Recover it from coding - let (recovered_data, recovered_coding) = recover(&blocktree, 0, offset as u64) - .expect("Expected successful recovery of erased blobs"); - - { - let recovered_data_blob = recovered_data - .first() - .expect("Expected recovered data blob to exist"); - - let recovered_coding_blob = recovered_coding - .first() - .expect("Expected recovered coding blob to exist"); - - // Check the recovered data result - let ref_l = refwindowdata.clone().unwrap(); - let ref_l2 = ref_l.read().unwrap(); - let result = recovered_data_blob.read().unwrap(); - - assert_eq!(result.size(), ref_l2.size()); - assert_eq!( - result.data[..ref_l2.data_size() as usize], - ref_l2.data[..ref_l2.data_size() as usize] - ); - assert_eq!(result.index(), coding_start as u64); - assert_eq!(result.slot(), 0 as u64); - - // Check the recovered erasure result - let ref_l = refwindowcoding.clone().unwrap(); - let ref_l2 = ref_l.read().unwrap(); - let result = recovered_coding_blob.read().unwrap(); - - assert_eq!(result.size(), ref_l2.size()); - assert_eq!( - result.data()[..ref_l2.size() as usize], - ref_l2.data()[..ref_l2.size() as usize] - ); - assert_eq!(result.index(), coding_start as u64); - assert_eq!(result.slot(), 0 as u64); - } - drop(blocktree); - Blocktree::destroy(&ledger_path) - .expect("Expected successful destruction of database ledger"); - } }