From 9c30bddb881d32da6d4f5a2dff8b903ab3d21bbf Mon Sep 17 00:00:00 2001 From: carllin Date: Wed, 5 Dec 2018 12:47:19 -0800 Subject: [PATCH] Rocks db erasure decoding (#1900) * Change erasure to consume new RocksDb window * Change tests for erasure * Remove erasure from window * Integrate erasure decoding back into window * Remove corrupted blobs from ledger * Replace Erasure result with result module's Result --- src/db_ledger.rs | 24 +- src/db_window.rs | 113 +++++++- src/erasure.rs | 667 ++++++++++++++++++++++++----------------------- src/packet.rs | 10 + src/window.rs | 10 - 5 files changed, 474 insertions(+), 350 deletions(-) diff --git a/src/db_ledger.rs b/src/db_ledger.rs index 3d0c7ab431..e94c812168 100644 --- a/src/db_ledger.rs +++ b/src/db_ledger.rs @@ -175,6 +175,11 @@ impl LedgerColumnFamilyRaw for DataCf { pub struct ErasureCf {} impl ErasureCf { + pub fn delete_by_slot_index(&self, db: &DB, slot_height: u64, index: u64) -> Result<()> { + let key = Self::key(slot_height, index); + self.delete(db, &key) + } + pub fn get_by_slot_index( &self, db: &DB, @@ -270,34 +275,37 @@ impl DbLedger { Ok(()) } - pub fn write_shared_blobs(&mut self, slot: u64, shared_blobs: I) -> Result<()> + pub fn write_shared_blobs(&mut self, slot: u64, shared_blobs: I) -> Result> where I: IntoIterator, I::Item: Borrow, { + let mut entries = vec![]; for b in shared_blobs { let bl = b.borrow().read().unwrap(); let index = bl.index()?; let key = DataCf::key(slot, index); - self.insert_data_blob(&key, &*bl)?; + let new_entries = self.insert_data_blob(&key, &*bl)?; + entries.extend(new_entries); } - - Ok(()) + Ok(entries) } - pub fn write_blobs<'a, I>(&mut self, slot: u64, blobs: I) -> Result<()> + pub fn write_blobs<'a, I>(&mut self, slot: u64, blobs: I) -> Result> where I: IntoIterator, { + let mut entries = vec![]; for blob in blobs.into_iter() { let index = blob.index()?; let key = DataCf::key(slot, index); - self.insert_data_blob(&key, blob)?; + let new_entries = self.insert_data_blob(&key, blob)?; + entries.extend(new_entries); } - Ok(()) + Ok(entries) } - pub fn write_entries(&mut self, slot: u64, entries: I) -> Result<()> + pub fn write_entries(&mut self, slot: u64, entries: I) -> Result> where I: IntoIterator, I::Item: Borrow, diff --git a/src/db_window.rs b/src/db_window.rs index 2804e7a50b..c19343d249 100644 --- a/src/db_window.rs +++ b/src/db_window.rs @@ -3,6 +3,8 @@ use cluster_info::ClusterInfo; use counter::Counter; use db_ledger::*; use entry::Entry; +#[cfg(feature = "erasure")] +use erasure; use leader_scheduler::LeaderScheduler; use log::Level; use packet::{SharedBlob, BLOB_HEADER_SIZE}; @@ -140,6 +142,12 @@ pub fn find_missing_indexes( let mut prev_index = start_index; 'outer: loop { if !db_iterator.valid() { + for i in prev_index..end_index { + missing_indexes.push(i); + if missing_indexes.len() == max_missing { + break; + } + } break; } let current_key = db_iterator.key().expect("Expect a valid key"); @@ -303,7 +311,19 @@ pub fn process_blob( db_ledger.insert_data_blob(&data_key, &blob.read().unwrap())? }; - // TODO: Once erasure is fixed, readd that logic here + #[cfg(feature = "erasure")] + { + // If write_shared_blobs() of these recovered blobs fails fails, don't return + // because consumed_entries might be nonempty from earlier, and tick height needs to + // be updated. Hopefully we can recover these blobs next time successfully. + if let Err(e) = try_erasure(db_ledger, slot, consume_queue) { + trace!( + "erasure::recover failed to write recovered coding blobs. Err: {:?}", + e + ); + } + } + for entry in &consumed_entries { *tick_height += entry.is_tick() as u64; } @@ -352,9 +372,39 @@ pub fn calculate_max_repair_entry_height( } } +#[cfg(feature = "erasure")] +fn try_erasure(db_ledger: &mut DbLedger, slot: u64, consume_queue: &mut Vec) -> Result<()> { + let meta = db_ledger.meta_cf.get(&db_ledger.db, &MetaCf::key(slot))?; + if let Some(meta) = meta { + let (data, coding) = erasure::recover(db_ledger, slot, meta.consumed)?; + for c in coding { + let cl = c.read().unwrap(); + let erasure_key = + ErasureCf::key(slot, cl.index().expect("Recovered blob must set index")); + let size = cl.size().expect("Recovered blob must set size"); + db_ledger.erasure_cf.put( + &db_ledger.db, + &erasure_key, + &cl.data[..BLOB_HEADER_SIZE + size], + )?; + } + + let entries = db_ledger.write_shared_blobs(slot, data)?; + consume_queue.extend(entries); + } + + Ok(()) +} + #[cfg(test)] mod test { use super::*; + #[cfg(all(feature = "erasure", test))] + use entry::reconstruct_entries_from_blobs; + #[cfg(all(feature = "erasure", test))] + use erasure::test::{generate_db_ledger_from_window, setup_window_ledger}; + #[cfg(all(feature = "erasure", test))] + use erasure::{NUM_CODING, NUM_DATA}; use ledger::{get_tmp_ledger_path, make_tiny_test_entries, Block}; use packet::{Blob, Packet, Packets, SharedBlob, PACKET_DATA_SIZE}; use rocksdb::{Options, DB}; @@ -572,6 +622,18 @@ mod test { vec![1], ); + // Test with end indexes that are greater than the last item in the ledger + let mut expected: Vec = (1..gap).collect(); + expected.push(gap + 1); + assert_eq!( + find_missing_data_indexes(slot, &db_ledger, 0, gap + 2, (gap + 2) as usize), + expected, + ); + assert_eq!( + find_missing_data_indexes(slot, &db_ledger, 0, gap + 2, (gap - 1) as usize), + &expected[..expected.len() - 1], + ); + for i in 0..num_entries as u64 { for j in 0..i { let expected: Vec = (j..i) @@ -626,4 +688,53 @@ mod test { DB::destroy(&Options::default(), &db_ledger_path) .expect("Expected successful database destruction"); } + + #[cfg(all(feature = "erasure", test))] + #[test] + pub fn test_try_erasure() { + // Setup the window + let offset = 0; + let num_blobs = NUM_DATA + 2; + let slot_height = DEFAULT_SLOT_HEIGHT; + let mut window = setup_window_ledger(offset, num_blobs, false, slot_height); + let end_index = (offset + num_blobs) % window.len(); + + // Test erasing a data block and an erasure block + let coding_start = offset - (offset % NUM_DATA) + (NUM_DATA - NUM_CODING); + + let erase_offset = coding_start % window.len(); + + // Create a hole in the window + let erased_data = window[erase_offset].data.clone(); + let erased_coding = window[erase_offset].coding.clone().unwrap(); + window[erase_offset].data = None; + window[erase_offset].coding = None; + + // Generate the db_ledger from the window + let ledger_path = get_tmp_ledger_path("test_try_erasure"); + let mut db_ledger = + generate_db_ledger_from_window(&ledger_path, &window, slot_height, false); + + let mut consume_queue = vec![]; + try_erasure(&mut db_ledger, slot_height, &mut consume_queue) + .expect("Expected successful erasure attempt"); + window[erase_offset].data = erased_data; + + let data_blobs: Vec<_> = window[erase_offset..end_index] + .iter() + .map(|slot| slot.data.clone().unwrap()) + .collect(); + let (expected, _) = reconstruct_entries_from_blobs(data_blobs).unwrap(); + assert_eq!(consume_queue, expected); + + let erased_coding_l = erased_coding.read().unwrap(); + assert_eq!( + &db_ledger + .erasure_cf + .get_by_slot_index(&db_ledger.db, slot_height, erase_offset as u64) + .unwrap() + .unwrap()[BLOB_HEADER_SIZE..], + &erased_coding_l.data()[..erased_coding_l.size().unwrap() as usize], + ); + } } diff --git a/src/erasure.rs b/src/erasure.rs index 42f22ad4ae..79af21a946 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -1,9 +1,11 @@ // Support erasure coding -use packet::{SharedBlob, BLOB_DATA_SIZE, BLOB_HEADER_SIZE}; +use db_ledger::DbLedger; +use db_window::{find_missing_coding_indexes, find_missing_data_indexes}; +use packet::{Blob, SharedBlob, BLOB_DATA_SIZE, BLOB_HEADER_SIZE, BLOB_SIZE}; +use result::{Error, Result}; use solana_sdk::pubkey::Pubkey; use std::cmp; -use std::mem; -use std::result; +use std::sync::{Arc, RwLock}; use window::WindowSlot; //TODO(sakridge) pick these values @@ -25,10 +27,9 @@ pub enum ErasureError { DecodeError, EncodeError, InvalidBlockSize, + InvalidBlobData, } -pub type Result = result::Result; - // k = number of data devices // m = number of coding devices // w = word size @@ -90,7 +91,7 @@ pub fn generate_coding_blocks(coding: &mut [&mut [u8]], data: &[&[u8]]) -> Resul block.len(), block_len ); - return Err(ErasureError::InvalidBlockSize); + return Err(Error::ErasureError(ErasureError::InvalidBlockSize)); } data_arg.push(block.as_ptr()); } @@ -102,7 +103,7 @@ pub fn generate_coding_blocks(coding: &mut [&mut [u8]], data: &[&[u8]]) -> Resul block.len(), block_len ); - return Err(ErasureError::InvalidBlockSize); + return Err(Error::ErasureError(ErasureError::InvalidBlockSize)); } coding_arg.push(block.as_mut_ptr()); } @@ -140,7 +141,7 @@ pub fn decode_blocks( let mut coding_arg: Vec<*mut u8> = Vec::new(); for x in coding.iter_mut() { if x.len() != block_len { - return Err(ErasureError::InvalidBlockSize); + return Err(Error::ErasureError(ErasureError::InvalidBlockSize)); } coding_arg.push(x.as_mut_ptr()); } @@ -149,7 +150,7 @@ pub fn decode_blocks( let mut data_arg: Vec<*mut u8> = Vec::new(); for x in data.iter_mut() { if x.len() != block_len { - return Err(ErasureError::InvalidBlockSize); + return Err(Error::ErasureError(ErasureError::InvalidBlockSize)); } data_arg.push(x.as_mut_ptr()); } @@ -172,7 +173,7 @@ pub fn decode_blocks( } trace!(""); if ret < 0 { - return Err(ErasureError::DecodeError); + return Err(Error::ErasureError(ErasureError::DecodeError)); } Ok(()) } @@ -256,8 +257,6 @@ pub fn generate_coding( // round up to the nearest jerasure alignment max_data_size = align!(max_data_size, JERASURE_ALIGN); - trace!("{} max_data_size: {}", id, max_data_size); - let mut data_blobs = Vec::with_capacity(NUM_DATA); for i in block_start..block_end { let n = i % window.len(); @@ -311,7 +310,7 @@ pub fn generate_coding( } coding_wl.set_size(max_data_size); if coding_wl.set_coding().is_err() { - return Err(ErasureError::EncodeError); + return Err(Error::ErasureError(ErasureError::EncodeError)); } coding_blobs.push(coding.clone()); @@ -347,199 +346,123 @@ pub fn generate_coding( Ok(()) } -// examine the window slot at idx returns -// true if slot is empty -// true if slot is stale (i.e. has the wrong index), old blob is flushed -// false if slot has a blob with the right index -fn is_missing(id: &Pubkey, idx: u64, window_slot: &mut Option, c_or_d: &str) -> bool { - if let Some(blob) = window_slot.take() { - let blob_idx = blob.read().unwrap().index().unwrap(); - if blob_idx == idx { - trace!("recover {}: idx: {} good {}", id, idx, c_or_d); - // put it back - mem::replace(window_slot, Some(blob)); - false - } else { - trace!( - "recover {}: idx: {} old {} {}, recycling", - id, - idx, - c_or_d, - blob_idx, - ); - true - } - } else { - trace!("recover {}: idx: {} None {}", id, idx, c_or_d); - // nothing there - true - } -} - -// examine the window beginning at block_start for missing or -// stale (based on block_start_idx) blobs -// if a blob is stale, remove it from the window slot -// side effect: block will be cleaned of old blobs -fn find_missing( - id: &Pubkey, - block_start_idx: u64, - block_start: usize, - window: &mut [WindowSlot], -) -> (usize, usize) { - let mut data_missing = 0; - let mut coding_missing = 0; - let block_end = block_start + NUM_DATA; - let coding_start = block_start + NUM_DATA - NUM_CODING; - - // count missing blobs in the block - for i in block_start..block_end { - let idx = (i - block_start) as u64 + block_start_idx; - let n = i % window.len(); - - if is_missing(id, idx, &mut window[n].data, "data") { - data_missing += 1; - } - - if i >= coding_start && is_missing(id, idx, &mut window[n].coding, "coding") { - coding_missing += 1; - } - } - (data_missing, coding_missing) -} - -// Recover a missing block into window -// missing blocks should be None or old... -// If not enough coding or data blocks are present to restore -// any of the blocks, the block is skipped. -// Side effect: old blobs in a block are None'd -pub fn recover(id: &Pubkey, window: &mut [WindowSlot], start_idx: u64, start: usize) -> Result<()> { - let block_start = start - (start % NUM_DATA); +// Recover the missing data and coding blobs from the input ledger. Returns a vector +// of the recovered missing data blobs and a vector of the recovered coding blobs +pub fn recover( + db_ledger: &mut DbLedger, + slot: u64, + start_idx: u64, +) -> Result<(Vec, Vec)> { let block_start_idx = start_idx - (start_idx % NUM_DATA as u64); - debug!("start: {} block_start: {}", start, block_start); + debug!("block_start_idx: {}", block_start_idx); - let coding_start = block_start + NUM_DATA - NUM_CODING; - let block_end = block_start + NUM_DATA; + let coding_start_idx = block_start_idx + NUM_DATA as u64 - NUM_CODING as u64; + let block_end_idx = block_start_idx + NUM_DATA as u64; trace!( - "recover {}: block_start_idx: {} block_start: {} coding_start: {} block_end: {}", - id, - block_start_idx, - block_start, - coding_start, - block_end + "recover: coding_start_idx: {} block_end_idx: {}", + coding_start_idx, + block_end_idx ); - let (data_missing, coding_missing) = find_missing(id, block_start_idx, block_start, window); + let data_missing = + find_missing_data_indexes(slot, db_ledger, block_start_idx, block_end_idx, NUM_DATA).len(); + let coding_missing = + find_missing_coding_indexes(slot, db_ledger, coding_start_idx, block_end_idx, NUM_CODING) + .len(); - // if we're not missing data, or if we have too much missin but have enough coding + // if we're not missing data, or if we have too much missing but have enough coding if data_missing == 0 { // nothing to do... - return Ok(()); + return Ok((vec![], vec![])); } if (data_missing + coding_missing) > NUM_CODING { trace!( - "recover {}: start: {} skipping recovery data: {} coding: {}", - id, - block_start, + "recover: start: {} skipping recovery data: {} coding: {}", + block_start_idx, data_missing, coding_missing ); // nothing to do... - return Err(ErasureError::NotEnoughBlocksToDecode); + return Err(Error::ErasureError(ErasureError::NotEnoughBlocksToDecode)); } trace!( - "recover {}: recovering: data: {} coding: {}", - id, + "recover: recovering: data: {} coding: {}", data_missing, coding_missing ); + let mut blobs: Vec = Vec::with_capacity(NUM_DATA + NUM_CODING); - let mut locks = Vec::with_capacity(NUM_DATA + NUM_CODING); let mut erasures: Vec = Vec::with_capacity(NUM_CODING); - let mut meta = None; + + let mut missing_data: Vec = vec![]; + let mut missing_coding: Vec = vec![]; let mut size = None; - // add the data blobs we have into recovery blob vector - for i in block_start..block_end { - let j = i % window.len(); + // Add the data blobs we have into the recovery vector, mark the missing ones + for i in block_start_idx..block_end_idx { + let result = db_ledger + .data_cf + .get_by_slot_index(&db_ledger.db, slot, i)?; - if let Some(b) = window[j].data.clone() { - if meta.is_none() { - meta = Some(b.read().unwrap().meta.clone()); - trace!("recover {} meta at {} {:?}", id, j, meta); - } - blobs.push(b); - } else { - let n = SharedBlob::default(); - window[j].data = Some(n.clone()); - // mark the missing memory - blobs.push(n); - erasures.push((i - block_start) as i32); - } + categorize_blob( + &result, + &mut blobs, + &mut missing_data, + &mut erasures, + (i - block_start_idx) as i32, + )?; } - for i in coding_start..block_end { - let j = i % window.len(); - if let Some(b) = window[j].coding.clone() { + + // Add the coding blobs we have into the recovery vector, mark the missing ones + for i in coding_start_idx..block_end_idx { + let result = db_ledger + .erasure_cf + .get_by_slot_index(&db_ledger.db, slot, i)?; + + categorize_blob( + &result, + &mut blobs, + &mut missing_coding, + &mut erasures, + ((i - coding_start_idx) + NUM_DATA as u64) as i32, + )?; + + if let Some(b) = result { if size.is_none() { - size = Some(b.read().unwrap().meta.size - BLOB_HEADER_SIZE); - trace!( - "{} recover size {} from {}", - id, - size.unwrap(), - i as u64 + block_start_idx - ); + size = Some(b.len() - BLOB_HEADER_SIZE); } - blobs.push(b); - } else { - let n = SharedBlob::default(); - window[j].coding = Some(n.clone()); - //mark the missing memory - blobs.push(n); - erasures.push(((i - coding_start) + NUM_DATA) as i32); } } - // now that we have size (from coding), zero out data blob tails + // Due to check (data_missing + coding_missing) > NUM_CODING from earlier in this function, + // we know at least one coding block must exist, so "size" will not remain None after the + // below processing. let size = size.unwrap(); - for i in block_start..block_end { - let j = i % window.len(); - - if let Some(b) = &window[j].data { - let mut b_wl = b.write().unwrap(); - for i in b_wl.meta.size..size { - b_wl.data[i] = 0; - } - } - } - // marks end of erasures erasures.push(-1); - trace!("erasures[]: {} {:?} data_size: {}", id, erasures, size,); - //lock everything for write - for b in &blobs { - locks.push(b.write().unwrap()); - } + trace!("erasures[]:{:?} data_size: {}", erasures, size,); + let mut locks = Vec::with_capacity(NUM_DATA + NUM_CODING); { let mut coding_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_CODING); let mut data_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_DATA); + + for b in &blobs { + locks.push(b.write().unwrap()); + } + for (i, l) in locks.iter_mut().enumerate() { if i < NUM_DATA { - trace!("{} pushing data: {}", id, i); data_ptrs.push(&mut l.data[..size]); } else { - trace!("{} pushing coding: {}", id, i); coding_ptrs.push(&mut l.data_mut()[..size]); } } - trace!( - "{} coding_ptrs.len: {} data_ptrs.len {}", - id, - coding_ptrs.len(), - data_ptrs.len() - ); + + // Decode the blocks decode_blocks( data_ptrs.as_mut_slice(), coding_ptrs.as_mut_slice(), @@ -547,9 +470,9 @@ pub fn recover(id: &Pubkey, window: &mut [WindowSlot], start_idx: u64, start: us )?; } - let meta = meta.unwrap(); + // Create the missing blobs from the reconstructed data let mut corrupt = false; - // repopulate header data size from recovered blob contents + for i in &erasures[..erasures.len() - 1] { let n = *i as usize; let mut idx = n as u64 + block_start_idx; @@ -559,48 +482,83 @@ pub fn recover(id: &Pubkey, window: &mut [WindowSlot], start_idx: u64, start: us data_size = locks[n].data_size().unwrap() as usize; data_size -= BLOB_HEADER_SIZE; if data_size > BLOB_DATA_SIZE { - error!("{} corrupt data blob[{}] data_size: {}", id, idx, data_size); + error!("corrupt data blob[{}] data_size: {}", idx, data_size); corrupt = true; + break; } } else { data_size = size; idx -= NUM_CODING as u64; + locks[n].set_slot(slot).unwrap(); locks[n].set_index(idx).unwrap(); if data_size - BLOB_HEADER_SIZE > BLOB_DATA_SIZE { - error!( - "{} corrupt coding blob[{}] data_size: {}", - id, idx, data_size - ); + error!("corrupt coding blob[{}] data_size: {}", idx, data_size); corrupt = true; + break; } } - locks[n].meta = meta.clone(); locks[n].set_size(data_size); trace!( - "{} erasures[{}] ({}) size: {} data[0]: {}", - id, + "erasures[{}] ({}) size: {} data[0]: {}", *i, idx, data_size, locks[n].data()[0] ); } - assert!(!corrupt, " {} ", id); + + if corrupt { + // Remove the corrupted coding blobs so there's no effort wasted in trying to reconstruct + // the blobs again + for i in coding_start_idx..block_end_idx { + db_ledger + .erasure_cf + .delete_by_slot_index(&db_ledger.db, slot, i)?; + } + return Ok((vec![], vec![])); + } + + Ok((missing_data, missing_coding)) +} + +fn categorize_blob( + get_blob_result: &Option>, + blobs: &mut Vec, + missing: &mut Vec, + erasures: &mut Vec, + erasure_index: i32, +) -> Result<()> { + match get_blob_result { + Some(b) => { + if b.len() <= BLOB_HEADER_SIZE || b.len() > BLOB_SIZE { + return Err(Error::ErasureError(ErasureError::InvalidBlobData)); + } + blobs.push(Arc::new(RwLock::new(Blob::new(&b)))); + } + None => { + // Mark the missing memory + erasures.push(erasure_index); + let b = SharedBlob::default(); + blobs.push(b.clone()); + missing.push(b); + } + } Ok(()) } #[cfg(test)] -mod test { - use erasure; +pub mod test { + use super::*; + use db_ledger::{DbLedger, DEFAULT_SLOT_HEIGHT}; + use ledger::{get_tmp_ledger_path, make_tiny_test_entries, Block}; use logger; - use packet::{index_blobs, SharedBlob, BLOB_DATA_SIZE, BLOB_HEADER_SIZE, BLOB_SIZE}; + use packet::{index_blobs, SharedBlob, BLOB_DATA_SIZE, BLOB_SIZE}; use rand::{thread_rng, Rng}; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; - // use std::sync::{Arc, RwLock}; use window::WindowSlot; #[test] @@ -618,10 +576,8 @@ mod test { let v_slices: Vec<_> = vs.iter().map(|x| x.as_slice()).collect(); assert!( - erasure::generate_coding_blocks( - coding_blocks_slices.as_mut_slice(), - v_slices.as_slice(), - ).is_ok() + generate_coding_blocks(coding_blocks_slices.as_mut_slice(), v_slices.as_slice(),) + .is_ok() ); } trace!("coding blocks:"); @@ -639,7 +595,7 @@ mod test { let mut v_slices: Vec<_> = vs.iter_mut().map(|x| x.as_mut_slice()).collect(); assert!( - erasure::decode_blocks( + decode_blocks( v_slices.as_mut_slice(), coding_blocks_slices.as_mut_slice(), erasures.as_slice(), @@ -654,45 +610,107 @@ mod test { assert_eq!(v_orig, vs[0]); } - fn print_window(window: &[WindowSlot]) { - for (i, w) in window.iter().enumerate() { - print!("window({:>w$}): ", i, w = 2); - if w.data.is_some() { - let window_l1 = w.data.clone().unwrap(); - let window_l2 = window_l1.read().unwrap(); - print!( - "data index: {:?} meta.size: {} data: ", - window_l2.index(), - window_l2.meta.size - ); - for i in 0..64 { - print!("{:>w$} ", window_l2.data()[i], w = 3); + // TODO: Temprorary function used in tests to generate a database ledger + // from the window (which is used to generate the erasure coding) + // until we also transition generate_coding() and BroadcastStage to use RocksDb. + // Github issue: https://github.com/solana-labs/solana/issues/1899. + pub fn generate_db_ledger_from_window( + ledger_path: &str, + window: &[WindowSlot], + slot_height: u64, + use_random: bool, + ) -> DbLedger { + let mut db_ledger = + DbLedger::open(ledger_path).expect("Expected to be able to open database ledger"); + for slot in window { + if let Some(ref data) = slot.data { + // If we're using gibberish blobs, skip validation checks and insert + // directly into the ledger + if use_random { + let data_l = data.read().unwrap(); + db_ledger + .data_cf + .put_by_slot_index( + &db_ledger.db, + slot_height, + data_l.index().unwrap(), + &data_l.data[..data_l.data_size().unwrap() as usize], + ).expect("Expected successful put into data column of ledger"); + } else { + db_ledger + .write_shared_blobs(slot_height, vec![data].into_iter()) + .unwrap(); } - } else { - print!("data null "); } - println!(); - print!("window({:>w$}): ", i, w = 2); - if w.coding.is_some() { - let window_l1 = w.coding.clone().unwrap(); - let window_l2 = window_l1.read().unwrap(); - print!( - "coding index: {:?} meta.size: {} data: ", - window_l2.index(), - window_l2.meta.size - ); - for i in 0..8 { - print!("{:>w$} ", window_l2.data()[i], w = 3); - } - } else { - print!("coding null"); + + if let Some(ref coding) = slot.coding { + let coding_lock = coding.read().unwrap(); + + let index = coding_lock + .index() + .expect("Expected coding blob to have valid index"); + + let data_size = coding_lock + .size() + .expect("Expected coding blob to have valid ata size"); + + db_ledger + .erasure_cf + .put_by_slot_index( + &db_ledger.db, + slot_height, + index, + &coding_lock.data[..data_size as usize + BLOB_HEADER_SIZE], + ).unwrap(); } - println!(); } + + db_ledger + } + + pub fn setup_window_ledger( + offset: usize, + num_blobs: usize, + use_random_window: bool, + slot: u64, + ) -> Vec { + // Generate a window + let mut window = { + if use_random_window { + generate_window(offset, num_blobs, slot) + } else { + generate_entry_window(offset, num_blobs) + } + }; + + for slot in &window { + if let Some(blob) = &slot.data { + let blob_r = blob.read().unwrap(); + assert!(!blob_r.is_coding()); + } + } + + // Generate the coding blocks + let mut index = (NUM_DATA + 2) as u64; + assert!( + generate_coding( + &Pubkey::default(), + &mut window, + offset as u64, + num_blobs, + &mut index + ).is_ok() + ); + assert_eq!(index, (NUM_DATA - NUM_CODING) as u64); + + // put junk in the tails, simulates re-used blobs + scramble_window_tails(&mut window, num_blobs); + + window } const WINDOW_SIZE: usize = 64; - fn generate_window(offset: usize, num_blobs: usize) -> Vec { + fn generate_window(offset: usize, num_blobs: usize, slot: u64) -> Vec { let mut window = vec![ WindowSlot { data: None, @@ -728,7 +746,7 @@ mod test { blobs.push(b_); } - index_blobs(&blobs, &Keypair::new().pubkey(), offset as u64, 13); + index_blobs(&blobs, &Keypair::new().pubkey(), offset as u64, slot); for b in blobs { let idx = b.read().unwrap().index().unwrap() as usize % WINDOW_SIZE; @@ -737,6 +755,27 @@ mod test { window } + fn generate_entry_window(offset: usize, num_blobs: usize) -> Vec { + let mut window = vec![ + WindowSlot { + data: None, + coding: None, + leader_unknown: false, + }; + WINDOW_SIZE + ]; + let entries = make_tiny_test_entries(num_blobs); + let blobs = entries.to_blobs(); + + index_blobs(&blobs, &Keypair::new().pubkey(), offset as u64, 13); + for b in blobs.into_iter() { + let idx = b.read().unwrap().index().unwrap() as usize % WINDOW_SIZE; + + window[idx].data = Some(b); + } + window + } + fn scramble_window_tails(window: &mut [WindowSlot], num_blobs: usize) { for i in 0..num_blobs { if let Some(b) = &window[i].data { @@ -753,156 +792,122 @@ mod test { } } + // Remove a data block, test for successful recovery #[test] pub fn test_window_recover_basic() { logger::setup(); - // Generate a window + + // Setup the window let offset = 0; - let num_blobs = erasure::NUM_DATA + 2; - let mut window = generate_window(WINDOW_SIZE, num_blobs); - - for slot in &window { - if let Some(blob) = &slot.data { - let blob_r = blob.read().unwrap(); - assert!(!blob_r.is_coding()); - } - } - - println!("** after-gen-window:"); - print_window(&window); - - // Generate the coding blocks - let mut index = (erasure::NUM_DATA + 2) as u64; - let id = Pubkey::default(); - assert!( - erasure::generate_coding(&id, &mut window, offset as u64, num_blobs, &mut index) - .is_ok() - ); - assert_eq!(index, (erasure::NUM_DATA - erasure::NUM_CODING) as u64); - - println!("** after-gen-coding:"); - print_window(&window); + let num_blobs = NUM_DATA + 2; + let mut window = setup_window_ledger(offset, num_blobs, true, DEFAULT_SLOT_HEIGHT); println!("** whack data block:"); - // test erasing a data block - let erase_offset = offset; + // Test erasing a data block + let erase_offset = offset % window.len(); + // Create a hole in the window let refwindow = window[erase_offset].data.clone(); window[erase_offset].data = None; - print_window(&window); - // put junk in the tails, simulates re-used blobs - scramble_window_tails(&mut window, num_blobs); + // Generate the db_ledger from the window + let ledger_path = get_tmp_ledger_path("test_window_recover_basic"); + let mut db_ledger = + generate_db_ledger_from_window(&ledger_path, &window, DEFAULT_SLOT_HEIGHT, true); // Recover it from coding - assert!(erasure::recover(&id, &mut window, (offset + WINDOW_SIZE) as u64, offset,).is_ok()); - println!("** after-recover:"); - print_window(&window); + let (recovered_data, recovered_coding) = recover(&mut db_ledger, 0, offset as u64) + .expect("Expected successful recovery of erased blobs"); + assert!(recovered_coding.is_empty()); { // Check the result, block is here to drop locks - - let window_l = window[erase_offset].data.clone().unwrap(); - let window_l2 = window_l.read().unwrap(); + let recovered_blob = recovered_data + .first() + .expect("Expected recovered data blob to exist"); let ref_l = refwindow.clone().unwrap(); let ref_l2 = ref_l.read().unwrap(); + let result = recovered_blob.read().unwrap(); - assert_eq!(window_l2.meta.size, ref_l2.meta.size); + assert_eq!(result.size().unwrap(), ref_l2.size().unwrap()); assert_eq!( - window_l2.data[..window_l2.meta.size], - ref_l2.data[..window_l2.meta.size] - ); - assert_eq!(window_l2.meta.addr, ref_l2.meta.addr); - assert_eq!(window_l2.meta.port, ref_l2.meta.port); - assert_eq!(window_l2.meta.v6, ref_l2.meta.v6); - assert_eq!( - window_l2.index().unwrap(), - (erase_offset + WINDOW_SIZE) as u64 + result.data[..ref_l2.data_size().unwrap() as usize], + ref_l2.data[..ref_l2.data_size().unwrap() as usize] ); + assert_eq!(result.index().unwrap(), offset as u64); + assert_eq!(result.slot().unwrap(), DEFAULT_SLOT_HEIGHT as u64); } + drop(db_ledger); + DbLedger::destroy(&ledger_path) + .expect("Expected successful destruction of database ledger"); + } + + // Remove a data and coding block, test for successful recovery + #[test] + pub fn test_window_recover_basic2() { + logger::setup(); + + // Setup the window + let offset = 0; + let num_blobs = NUM_DATA + 2; + let mut window = setup_window_ledger(offset, num_blobs, true, DEFAULT_SLOT_HEIGHT); println!("** whack coding block and data block"); - // tests erasing a coding block and a data block - let erase_offset = offset + erasure::NUM_DATA - erasure::NUM_CODING; + // Tests erasing a coding block and a data block + let coding_start = offset - (offset % NUM_DATA) + (NUM_DATA - NUM_CODING); + let erase_offset = coding_start % window.len(); + // Create a hole in the window - let refwindow = window[erase_offset].data.clone(); + let refwindowdata = window[erase_offset].data.clone(); + let refwindowcoding = window[erase_offset].coding.clone(); window[erase_offset].data = None; window[erase_offset].coding = None; - - print_window(&window); + let ledger_path = get_tmp_ledger_path("test_window_recover_basic2"); + let mut db_ledger = + generate_db_ledger_from_window(&ledger_path, &window, DEFAULT_SLOT_HEIGHT, true); // Recover it from coding - assert!(erasure::recover(&id, &mut window, (offset + WINDOW_SIZE) as u64, offset,).is_ok()); - println!("** after-recover:"); - print_window(&window); + let (recovered_data, recovered_coding) = recover(&mut db_ledger, 0, offset as u64) + .expect("Expected successful recovery of erased blobs"); { - // Check the result, block is here to drop locks - let window_l = window[erase_offset].data.clone().unwrap(); - let window_l2 = window_l.read().unwrap(); - let ref_l = refwindow.clone().unwrap(); + let recovered_data_blob = recovered_data + .first() + .expect("Expected recovered data blob to exist"); + + let recovered_coding_blob = recovered_coding + .first() + .expect("Expected recovered coding blob to exist"); + + // Check the recovered data result + let ref_l = refwindowdata.clone().unwrap(); let ref_l2 = ref_l.read().unwrap(); - assert_eq!(window_l2.meta.size, ref_l2.meta.size); + let result = recovered_data_blob.read().unwrap(); + + assert_eq!(result.size().unwrap(), ref_l2.size().unwrap()); assert_eq!( - window_l2.data[..window_l2.meta.size], - ref_l2.data[..window_l2.meta.size] + result.data[..ref_l2.data_size().unwrap() as usize], + ref_l2.data[..ref_l2.data_size().unwrap() as usize] ); - assert_eq!(window_l2.meta.addr, ref_l2.meta.addr); - assert_eq!(window_l2.meta.port, ref_l2.meta.port); - assert_eq!(window_l2.meta.v6, ref_l2.meta.v6); - assert_eq!( - window_l2.index().unwrap(), - (erase_offset + WINDOW_SIZE) as u64 - ); - } + assert_eq!(result.index().unwrap(), coding_start as u64); + assert_eq!(result.slot().unwrap(), DEFAULT_SLOT_HEIGHT as u64); - println!("** make stale data block index"); - // tests erasing a coding block - let erase_offset = offset; - // Create a hole in the window by making the blob's index stale - let refwindow = window[offset].data.clone(); - if let Some(blob) = &window[erase_offset].data { - blob.write() - .unwrap() - .set_index(erase_offset as u64) - .unwrap(); // this also writes to refwindow... - } - print_window(&window); - - // Recover it from coding - assert!(erasure::recover(&id, &mut window, (offset + WINDOW_SIZE) as u64, offset,).is_ok()); - println!("** after-recover:"); - print_window(&window); - - // fix refwindow, we wrote to it above... - if let Some(blob) = &refwindow { - blob.write() - .unwrap() - .set_index((erase_offset + WINDOW_SIZE) as u64) - .unwrap(); // this also writes to refwindow... - } - - { - // Check the result, block is here to drop locks - let window_l = window[erase_offset].data.clone().unwrap(); - let window_l2 = window_l.read().unwrap(); - let ref_l = refwindow.clone().unwrap(); + // Check the recovered erasure result + let ref_l = refwindowcoding.clone().unwrap(); let ref_l2 = ref_l.read().unwrap(); - assert_eq!(window_l2.meta.size, ref_l2.meta.size); + let result = recovered_coding_blob.read().unwrap(); + + assert_eq!(result.size().unwrap(), ref_l2.size().unwrap()); assert_eq!( - window_l2.data[..window_l2.meta.size], - ref_l2.data[..window_l2.meta.size] - ); - assert_eq!(window_l2.index().unwrap(), ref_l2.index().unwrap()); - assert_eq!(window_l2.slot().unwrap(), ref_l2.slot().unwrap()); - assert_eq!(window_l2.meta.addr, ref_l2.meta.addr); - assert_eq!(window_l2.meta.port, ref_l2.meta.port); - assert_eq!(window_l2.meta.v6, ref_l2.meta.v6); - assert_eq!( - window_l2.index().unwrap(), - (erase_offset + WINDOW_SIZE) as u64 + result.data()[..ref_l2.size().unwrap() as usize], + ref_l2.data()[..ref_l2.size().unwrap() as usize] ); + assert_eq!(result.index().unwrap(), coding_start as u64); + assert_eq!(result.slot().unwrap(), DEFAULT_SLOT_HEIGHT as u64); } + drop(db_ledger); + DbLedger::destroy(&ledger_path) + .expect("Expected successful destruction of database ledger"); } // //TODO This needs to be reworked @@ -912,25 +917,25 @@ mod test { // logger::setup(); // let offset = 4; // let data_len = 16; - // let num_blobs = erasure::NUM_DATA + 2; + // let num_blobs = NUM_DATA + 2; // let (mut window, blobs_len) = generate_window(data_len, offset, num_blobs); // println!("** after-gen:"); // print_window(&window); - // assert!(erasure::generate_coding(&mut window, offset, blobs_len).is_ok()); + // assert!(generate_coding(&mut window, offset, blobs_len).is_ok()); // println!("** after-coding:"); // print_window(&window); // let refwindow = window[offset + 1].clone(); // window[offset + 1] = None; // window[offset + 2] = None; - // window[offset + erasure::SET_SIZE + 3] = None; - // window[offset + (2 * erasure::SET_SIZE) + 0] = None; - // window[offset + (2 * erasure::SET_SIZE) + 1] = None; - // window[offset + (2 * erasure::SET_SIZE) + 2] = None; - // let window_l0 = &(window[offset + (3 * erasure::SET_SIZE)]).clone().unwrap(); + // window[offset + SET_SIZE + 3] = None; + // window[offset + (2 * SET_SIZE) + 0] = None; + // window[offset + (2 * SET_SIZE) + 1] = None; + // window[offset + (2 * SET_SIZE) + 2] = None; + // let window_l0 = &(window[offset + (3 * SET_SIZE)]).clone().unwrap(); // window_l0.write().unwrap().data[0] = 55; // println!("** after-nulling:"); // print_window(&window); - // assert!(erasure::recover(&mut window, offset, offset + blobs_len).is_ok()); + // assert!(recover(&mut window, offset, offset + blobs_len).is_ok()); // println!("** after-restore:"); // print_window(&window); // let window_l = window[offset + 1].clone().unwrap(); diff --git a/src/packet.rs b/src/packet.rs index 30a036de98..292dcc79ed 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -14,6 +14,7 @@ use serde::Serialize; use solana_sdk::hash::Hash; pub use solana_sdk::packet::PACKET_DATA_SIZE; use solana_sdk::pubkey::Pubkey; +use std::cmp; use std::fmt; use std::io; use std::mem::size_of; @@ -274,6 +275,15 @@ pub const BLOB_FLAG_IS_CODING: u32 = 0x1; pub const BLOB_HEADER_SIZE: usize = align!(BLOB_SIZE_END, 64); impl Blob { + pub fn new(data: &[u8]) -> Self { + let mut blob = Self::default(); + let data_len = cmp::min(data.len(), blob.data.len()); + let bytes = &data[..data_len]; + blob.data[..data_len].copy_from_slice(bytes); + blob.meta.size = blob.data_size().expect("Expected valid data size") as usize; + blob + } + pub fn slot(&self) -> Result { let mut rdr = io::Cursor::new(&self.data[0..BLOB_SLOT_END]); let r = rdr.read_u64::()?; diff --git a/src/window.rs b/src/window.rs index bafdead363..19f05cd872 100644 --- a/src/window.rs +++ b/src/window.rs @@ -4,8 +4,6 @@ use cluster_info::ClusterInfo; use counter::Counter; use entry::reconstruct_entries_from_blobs; use entry::Entry; -#[cfg(feature = "erasure")] -use erasure; use leader_scheduler::LeaderScheduler; use log::Level; use packet::SharedBlob; @@ -323,14 +321,6 @@ impl WindowUtil for Window { self[w].leader_unknown = leader_unknown; *pending_retransmits = true; - #[cfg(feature = "erasure")] - { - let window_size = self.window_size(); - if erasure::recover(id, self, *consumed, (*consumed % window_size) as usize).is_err() { - trace!("{}: erasure::recover failed", id); - } - } - // push all contiguous blobs into consumed queue, increment consumed loop { let k = (*consumed % self.window_size()) as usize;