Rocks db erasure decoding (#1900)
* Change erasure to consume new RocksDb window * Change tests for erasure * Remove erasure from window * Integrate erasure decoding back into window * Remove corrupted blobs from ledger * Replace Erasure result with result module's Result
This commit is contained in:
@ -175,6 +175,11 @@ impl LedgerColumnFamilyRaw for DataCf {
|
|||||||
pub struct ErasureCf {}
|
pub struct ErasureCf {}
|
||||||
|
|
||||||
impl ErasureCf {
|
impl ErasureCf {
|
||||||
|
pub fn delete_by_slot_index(&self, db: &DB, slot_height: u64, index: u64) -> Result<()> {
|
||||||
|
let key = Self::key(slot_height, index);
|
||||||
|
self.delete(db, &key)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn get_by_slot_index(
|
pub fn get_by_slot_index(
|
||||||
&self,
|
&self,
|
||||||
db: &DB,
|
db: &DB,
|
||||||
@ -270,34 +275,37 @@ impl DbLedger {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn write_shared_blobs<I>(&mut self, slot: u64, shared_blobs: I) -> Result<()>
|
pub fn write_shared_blobs<I>(&mut self, slot: u64, shared_blobs: I) -> Result<Vec<Entry>>
|
||||||
where
|
where
|
||||||
I: IntoIterator,
|
I: IntoIterator,
|
||||||
I::Item: Borrow<SharedBlob>,
|
I::Item: Borrow<SharedBlob>,
|
||||||
{
|
{
|
||||||
|
let mut entries = vec![];
|
||||||
for b in shared_blobs {
|
for b in shared_blobs {
|
||||||
let bl = b.borrow().read().unwrap();
|
let bl = b.borrow().read().unwrap();
|
||||||
let index = bl.index()?;
|
let index = bl.index()?;
|
||||||
let key = DataCf::key(slot, index);
|
let key = DataCf::key(slot, index);
|
||||||
self.insert_data_blob(&key, &*bl)?;
|
let new_entries = self.insert_data_blob(&key, &*bl)?;
|
||||||
|
entries.extend(new_entries);
|
||||||
}
|
}
|
||||||
|
Ok(entries)
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn write_blobs<'a, I>(&mut self, slot: u64, blobs: I) -> Result<()>
|
pub fn write_blobs<'a, I>(&mut self, slot: u64, blobs: I) -> Result<Vec<Entry>>
|
||||||
where
|
where
|
||||||
I: IntoIterator<Item = &'a &'a Blob>,
|
I: IntoIterator<Item = &'a &'a Blob>,
|
||||||
{
|
{
|
||||||
|
let mut entries = vec![];
|
||||||
for blob in blobs.into_iter() {
|
for blob in blobs.into_iter() {
|
||||||
let index = blob.index()?;
|
let index = blob.index()?;
|
||||||
let key = DataCf::key(slot, index);
|
let key = DataCf::key(slot, index);
|
||||||
self.insert_data_blob(&key, blob)?;
|
let new_entries = self.insert_data_blob(&key, blob)?;
|
||||||
|
entries.extend(new_entries);
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(entries)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn write_entries<I>(&mut self, slot: u64, entries: I) -> Result<()>
|
pub fn write_entries<I>(&mut self, slot: u64, entries: I) -> Result<Vec<Entry>>
|
||||||
where
|
where
|
||||||
I: IntoIterator,
|
I: IntoIterator,
|
||||||
I::Item: Borrow<Entry>,
|
I::Item: Borrow<Entry>,
|
||||||
|
113
src/db_window.rs
113
src/db_window.rs
@ -3,6 +3,8 @@ use cluster_info::ClusterInfo;
|
|||||||
use counter::Counter;
|
use counter::Counter;
|
||||||
use db_ledger::*;
|
use db_ledger::*;
|
||||||
use entry::Entry;
|
use entry::Entry;
|
||||||
|
#[cfg(feature = "erasure")]
|
||||||
|
use erasure;
|
||||||
use leader_scheduler::LeaderScheduler;
|
use leader_scheduler::LeaderScheduler;
|
||||||
use log::Level;
|
use log::Level;
|
||||||
use packet::{SharedBlob, BLOB_HEADER_SIZE};
|
use packet::{SharedBlob, BLOB_HEADER_SIZE};
|
||||||
@ -140,6 +142,12 @@ pub fn find_missing_indexes(
|
|||||||
let mut prev_index = start_index;
|
let mut prev_index = start_index;
|
||||||
'outer: loop {
|
'outer: loop {
|
||||||
if !db_iterator.valid() {
|
if !db_iterator.valid() {
|
||||||
|
for i in prev_index..end_index {
|
||||||
|
missing_indexes.push(i);
|
||||||
|
if missing_indexes.len() == max_missing {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
let current_key = db_iterator.key().expect("Expect a valid key");
|
let current_key = db_iterator.key().expect("Expect a valid key");
|
||||||
@ -303,7 +311,19 @@ pub fn process_blob(
|
|||||||
db_ledger.insert_data_blob(&data_key, &blob.read().unwrap())?
|
db_ledger.insert_data_blob(&data_key, &blob.read().unwrap())?
|
||||||
};
|
};
|
||||||
|
|
||||||
// TODO: Once erasure is fixed, readd that logic here
|
#[cfg(feature = "erasure")]
|
||||||
|
{
|
||||||
|
// If write_shared_blobs() of these recovered blobs fails fails, don't return
|
||||||
|
// because consumed_entries might be nonempty from earlier, and tick height needs to
|
||||||
|
// be updated. Hopefully we can recover these blobs next time successfully.
|
||||||
|
if let Err(e) = try_erasure(db_ledger, slot, consume_queue) {
|
||||||
|
trace!(
|
||||||
|
"erasure::recover failed to write recovered coding blobs. Err: {:?}",
|
||||||
|
e
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for entry in &consumed_entries {
|
for entry in &consumed_entries {
|
||||||
*tick_height += entry.is_tick() as u64;
|
*tick_height += entry.is_tick() as u64;
|
||||||
}
|
}
|
||||||
@ -352,9 +372,39 @@ pub fn calculate_max_repair_entry_height(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "erasure")]
|
||||||
|
fn try_erasure(db_ledger: &mut DbLedger, slot: u64, consume_queue: &mut Vec<Entry>) -> Result<()> {
|
||||||
|
let meta = db_ledger.meta_cf.get(&db_ledger.db, &MetaCf::key(slot))?;
|
||||||
|
if let Some(meta) = meta {
|
||||||
|
let (data, coding) = erasure::recover(db_ledger, slot, meta.consumed)?;
|
||||||
|
for c in coding {
|
||||||
|
let cl = c.read().unwrap();
|
||||||
|
let erasure_key =
|
||||||
|
ErasureCf::key(slot, cl.index().expect("Recovered blob must set index"));
|
||||||
|
let size = cl.size().expect("Recovered blob must set size");
|
||||||
|
db_ledger.erasure_cf.put(
|
||||||
|
&db_ledger.db,
|
||||||
|
&erasure_key,
|
||||||
|
&cl.data[..BLOB_HEADER_SIZE + size],
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let entries = db_ledger.write_shared_blobs(slot, data)?;
|
||||||
|
consume_queue.extend(entries);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
#[cfg(all(feature = "erasure", test))]
|
||||||
|
use entry::reconstruct_entries_from_blobs;
|
||||||
|
#[cfg(all(feature = "erasure", test))]
|
||||||
|
use erasure::test::{generate_db_ledger_from_window, setup_window_ledger};
|
||||||
|
#[cfg(all(feature = "erasure", test))]
|
||||||
|
use erasure::{NUM_CODING, NUM_DATA};
|
||||||
use ledger::{get_tmp_ledger_path, make_tiny_test_entries, Block};
|
use ledger::{get_tmp_ledger_path, make_tiny_test_entries, Block};
|
||||||
use packet::{Blob, Packet, Packets, SharedBlob, PACKET_DATA_SIZE};
|
use packet::{Blob, Packet, Packets, SharedBlob, PACKET_DATA_SIZE};
|
||||||
use rocksdb::{Options, DB};
|
use rocksdb::{Options, DB};
|
||||||
@ -572,6 +622,18 @@ mod test {
|
|||||||
vec![1],
|
vec![1],
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Test with end indexes that are greater than the last item in the ledger
|
||||||
|
let mut expected: Vec<u64> = (1..gap).collect();
|
||||||
|
expected.push(gap + 1);
|
||||||
|
assert_eq!(
|
||||||
|
find_missing_data_indexes(slot, &db_ledger, 0, gap + 2, (gap + 2) as usize),
|
||||||
|
expected,
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
find_missing_data_indexes(slot, &db_ledger, 0, gap + 2, (gap - 1) as usize),
|
||||||
|
&expected[..expected.len() - 1],
|
||||||
|
);
|
||||||
|
|
||||||
for i in 0..num_entries as u64 {
|
for i in 0..num_entries as u64 {
|
||||||
for j in 0..i {
|
for j in 0..i {
|
||||||
let expected: Vec<u64> = (j..i)
|
let expected: Vec<u64> = (j..i)
|
||||||
@ -626,4 +688,53 @@ mod test {
|
|||||||
DB::destroy(&Options::default(), &db_ledger_path)
|
DB::destroy(&Options::default(), &db_ledger_path)
|
||||||
.expect("Expected successful database destruction");
|
.expect("Expected successful database destruction");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(all(feature = "erasure", test))]
|
||||||
|
#[test]
|
||||||
|
pub fn test_try_erasure() {
|
||||||
|
// Setup the window
|
||||||
|
let offset = 0;
|
||||||
|
let num_blobs = NUM_DATA + 2;
|
||||||
|
let slot_height = DEFAULT_SLOT_HEIGHT;
|
||||||
|
let mut window = setup_window_ledger(offset, num_blobs, false, slot_height);
|
||||||
|
let end_index = (offset + num_blobs) % window.len();
|
||||||
|
|
||||||
|
// Test erasing a data block and an erasure block
|
||||||
|
let coding_start = offset - (offset % NUM_DATA) + (NUM_DATA - NUM_CODING);
|
||||||
|
|
||||||
|
let erase_offset = coding_start % window.len();
|
||||||
|
|
||||||
|
// Create a hole in the window
|
||||||
|
let erased_data = window[erase_offset].data.clone();
|
||||||
|
let erased_coding = window[erase_offset].coding.clone().unwrap();
|
||||||
|
window[erase_offset].data = None;
|
||||||
|
window[erase_offset].coding = None;
|
||||||
|
|
||||||
|
// Generate the db_ledger from the window
|
||||||
|
let ledger_path = get_tmp_ledger_path("test_try_erasure");
|
||||||
|
let mut db_ledger =
|
||||||
|
generate_db_ledger_from_window(&ledger_path, &window, slot_height, false);
|
||||||
|
|
||||||
|
let mut consume_queue = vec![];
|
||||||
|
try_erasure(&mut db_ledger, slot_height, &mut consume_queue)
|
||||||
|
.expect("Expected successful erasure attempt");
|
||||||
|
window[erase_offset].data = erased_data;
|
||||||
|
|
||||||
|
let data_blobs: Vec<_> = window[erase_offset..end_index]
|
||||||
|
.iter()
|
||||||
|
.map(|slot| slot.data.clone().unwrap())
|
||||||
|
.collect();
|
||||||
|
let (expected, _) = reconstruct_entries_from_blobs(data_blobs).unwrap();
|
||||||
|
assert_eq!(consume_queue, expected);
|
||||||
|
|
||||||
|
let erased_coding_l = erased_coding.read().unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
&db_ledger
|
||||||
|
.erasure_cf
|
||||||
|
.get_by_slot_index(&db_ledger.db, slot_height, erase_offset as u64)
|
||||||
|
.unwrap()
|
||||||
|
.unwrap()[BLOB_HEADER_SIZE..],
|
||||||
|
&erased_coding_l.data()[..erased_coding_l.size().unwrap() as usize],
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
667
src/erasure.rs
667
src/erasure.rs
@ -1,9 +1,11 @@
|
|||||||
// Support erasure coding
|
// Support erasure coding
|
||||||
use packet::{SharedBlob, BLOB_DATA_SIZE, BLOB_HEADER_SIZE};
|
use db_ledger::DbLedger;
|
||||||
|
use db_window::{find_missing_coding_indexes, find_missing_data_indexes};
|
||||||
|
use packet::{Blob, SharedBlob, BLOB_DATA_SIZE, BLOB_HEADER_SIZE, BLOB_SIZE};
|
||||||
|
use result::{Error, Result};
|
||||||
use solana_sdk::pubkey::Pubkey;
|
use solana_sdk::pubkey::Pubkey;
|
||||||
use std::cmp;
|
use std::cmp;
|
||||||
use std::mem;
|
use std::sync::{Arc, RwLock};
|
||||||
use std::result;
|
|
||||||
use window::WindowSlot;
|
use window::WindowSlot;
|
||||||
|
|
||||||
//TODO(sakridge) pick these values
|
//TODO(sakridge) pick these values
|
||||||
@ -25,10 +27,9 @@ pub enum ErasureError {
|
|||||||
DecodeError,
|
DecodeError,
|
||||||
EncodeError,
|
EncodeError,
|
||||||
InvalidBlockSize,
|
InvalidBlockSize,
|
||||||
|
InvalidBlobData,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type Result<T> = result::Result<T, ErasureError>;
|
|
||||||
|
|
||||||
// k = number of data devices
|
// k = number of data devices
|
||||||
// m = number of coding devices
|
// m = number of coding devices
|
||||||
// w = word size
|
// w = word size
|
||||||
@ -90,7 +91,7 @@ pub fn generate_coding_blocks(coding: &mut [&mut [u8]], data: &[&[u8]]) -> Resul
|
|||||||
block.len(),
|
block.len(),
|
||||||
block_len
|
block_len
|
||||||
);
|
);
|
||||||
return Err(ErasureError::InvalidBlockSize);
|
return Err(Error::ErasureError(ErasureError::InvalidBlockSize));
|
||||||
}
|
}
|
||||||
data_arg.push(block.as_ptr());
|
data_arg.push(block.as_ptr());
|
||||||
}
|
}
|
||||||
@ -102,7 +103,7 @@ pub fn generate_coding_blocks(coding: &mut [&mut [u8]], data: &[&[u8]]) -> Resul
|
|||||||
block.len(),
|
block.len(),
|
||||||
block_len
|
block_len
|
||||||
);
|
);
|
||||||
return Err(ErasureError::InvalidBlockSize);
|
return Err(Error::ErasureError(ErasureError::InvalidBlockSize));
|
||||||
}
|
}
|
||||||
coding_arg.push(block.as_mut_ptr());
|
coding_arg.push(block.as_mut_ptr());
|
||||||
}
|
}
|
||||||
@ -140,7 +141,7 @@ pub fn decode_blocks(
|
|||||||
let mut coding_arg: Vec<*mut u8> = Vec::new();
|
let mut coding_arg: Vec<*mut u8> = Vec::new();
|
||||||
for x in coding.iter_mut() {
|
for x in coding.iter_mut() {
|
||||||
if x.len() != block_len {
|
if x.len() != block_len {
|
||||||
return Err(ErasureError::InvalidBlockSize);
|
return Err(Error::ErasureError(ErasureError::InvalidBlockSize));
|
||||||
}
|
}
|
||||||
coding_arg.push(x.as_mut_ptr());
|
coding_arg.push(x.as_mut_ptr());
|
||||||
}
|
}
|
||||||
@ -149,7 +150,7 @@ pub fn decode_blocks(
|
|||||||
let mut data_arg: Vec<*mut u8> = Vec::new();
|
let mut data_arg: Vec<*mut u8> = Vec::new();
|
||||||
for x in data.iter_mut() {
|
for x in data.iter_mut() {
|
||||||
if x.len() != block_len {
|
if x.len() != block_len {
|
||||||
return Err(ErasureError::InvalidBlockSize);
|
return Err(Error::ErasureError(ErasureError::InvalidBlockSize));
|
||||||
}
|
}
|
||||||
data_arg.push(x.as_mut_ptr());
|
data_arg.push(x.as_mut_ptr());
|
||||||
}
|
}
|
||||||
@ -172,7 +173,7 @@ pub fn decode_blocks(
|
|||||||
}
|
}
|
||||||
trace!("");
|
trace!("");
|
||||||
if ret < 0 {
|
if ret < 0 {
|
||||||
return Err(ErasureError::DecodeError);
|
return Err(Error::ErasureError(ErasureError::DecodeError));
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -256,8 +257,6 @@ pub fn generate_coding(
|
|||||||
// round up to the nearest jerasure alignment
|
// round up to the nearest jerasure alignment
|
||||||
max_data_size = align!(max_data_size, JERASURE_ALIGN);
|
max_data_size = align!(max_data_size, JERASURE_ALIGN);
|
||||||
|
|
||||||
trace!("{} max_data_size: {}", id, max_data_size);
|
|
||||||
|
|
||||||
let mut data_blobs = Vec::with_capacity(NUM_DATA);
|
let mut data_blobs = Vec::with_capacity(NUM_DATA);
|
||||||
for i in block_start..block_end {
|
for i in block_start..block_end {
|
||||||
let n = i % window.len();
|
let n = i % window.len();
|
||||||
@ -311,7 +310,7 @@ pub fn generate_coding(
|
|||||||
}
|
}
|
||||||
coding_wl.set_size(max_data_size);
|
coding_wl.set_size(max_data_size);
|
||||||
if coding_wl.set_coding().is_err() {
|
if coding_wl.set_coding().is_err() {
|
||||||
return Err(ErasureError::EncodeError);
|
return Err(Error::ErasureError(ErasureError::EncodeError));
|
||||||
}
|
}
|
||||||
|
|
||||||
coding_blobs.push(coding.clone());
|
coding_blobs.push(coding.clone());
|
||||||
@ -347,199 +346,123 @@ pub fn generate_coding(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
// examine the window slot at idx returns
|
// Recover the missing data and coding blobs from the input ledger. Returns a vector
|
||||||
// true if slot is empty
|
// of the recovered missing data blobs and a vector of the recovered coding blobs
|
||||||
// true if slot is stale (i.e. has the wrong index), old blob is flushed
|
pub fn recover(
|
||||||
// false if slot has a blob with the right index
|
db_ledger: &mut DbLedger,
|
||||||
fn is_missing(id: &Pubkey, idx: u64, window_slot: &mut Option<SharedBlob>, c_or_d: &str) -> bool {
|
slot: u64,
|
||||||
if let Some(blob) = window_slot.take() {
|
start_idx: u64,
|
||||||
let blob_idx = blob.read().unwrap().index().unwrap();
|
) -> Result<(Vec<SharedBlob>, Vec<SharedBlob>)> {
|
||||||
if blob_idx == idx {
|
|
||||||
trace!("recover {}: idx: {} good {}", id, idx, c_or_d);
|
|
||||||
// put it back
|
|
||||||
mem::replace(window_slot, Some(blob));
|
|
||||||
false
|
|
||||||
} else {
|
|
||||||
trace!(
|
|
||||||
"recover {}: idx: {} old {} {}, recycling",
|
|
||||||
id,
|
|
||||||
idx,
|
|
||||||
c_or_d,
|
|
||||||
blob_idx,
|
|
||||||
);
|
|
||||||
true
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
trace!("recover {}: idx: {} None {}", id, idx, c_or_d);
|
|
||||||
// nothing there
|
|
||||||
true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// examine the window beginning at block_start for missing or
|
|
||||||
// stale (based on block_start_idx) blobs
|
|
||||||
// if a blob is stale, remove it from the window slot
|
|
||||||
// side effect: block will be cleaned of old blobs
|
|
||||||
fn find_missing(
|
|
||||||
id: &Pubkey,
|
|
||||||
block_start_idx: u64,
|
|
||||||
block_start: usize,
|
|
||||||
window: &mut [WindowSlot],
|
|
||||||
) -> (usize, usize) {
|
|
||||||
let mut data_missing = 0;
|
|
||||||
let mut coding_missing = 0;
|
|
||||||
let block_end = block_start + NUM_DATA;
|
|
||||||
let coding_start = block_start + NUM_DATA - NUM_CODING;
|
|
||||||
|
|
||||||
// count missing blobs in the block
|
|
||||||
for i in block_start..block_end {
|
|
||||||
let idx = (i - block_start) as u64 + block_start_idx;
|
|
||||||
let n = i % window.len();
|
|
||||||
|
|
||||||
if is_missing(id, idx, &mut window[n].data, "data") {
|
|
||||||
data_missing += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if i >= coding_start && is_missing(id, idx, &mut window[n].coding, "coding") {
|
|
||||||
coding_missing += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
(data_missing, coding_missing)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Recover a missing block into window
|
|
||||||
// missing blocks should be None or old...
|
|
||||||
// If not enough coding or data blocks are present to restore
|
|
||||||
// any of the blocks, the block is skipped.
|
|
||||||
// Side effect: old blobs in a block are None'd
|
|
||||||
pub fn recover(id: &Pubkey, window: &mut [WindowSlot], start_idx: u64, start: usize) -> Result<()> {
|
|
||||||
let block_start = start - (start % NUM_DATA);
|
|
||||||
let block_start_idx = start_idx - (start_idx % NUM_DATA as u64);
|
let block_start_idx = start_idx - (start_idx % NUM_DATA as u64);
|
||||||
|
|
||||||
debug!("start: {} block_start: {}", start, block_start);
|
debug!("block_start_idx: {}", block_start_idx);
|
||||||
|
|
||||||
let coding_start = block_start + NUM_DATA - NUM_CODING;
|
let coding_start_idx = block_start_idx + NUM_DATA as u64 - NUM_CODING as u64;
|
||||||
let block_end = block_start + NUM_DATA;
|
let block_end_idx = block_start_idx + NUM_DATA as u64;
|
||||||
trace!(
|
trace!(
|
||||||
"recover {}: block_start_idx: {} block_start: {} coding_start: {} block_end: {}",
|
"recover: coding_start_idx: {} block_end_idx: {}",
|
||||||
id,
|
coding_start_idx,
|
||||||
block_start_idx,
|
block_end_idx
|
||||||
block_start,
|
|
||||||
coding_start,
|
|
||||||
block_end
|
|
||||||
);
|
);
|
||||||
|
|
||||||
let (data_missing, coding_missing) = find_missing(id, block_start_idx, block_start, window);
|
let data_missing =
|
||||||
|
find_missing_data_indexes(slot, db_ledger, block_start_idx, block_end_idx, NUM_DATA).len();
|
||||||
|
let coding_missing =
|
||||||
|
find_missing_coding_indexes(slot, db_ledger, coding_start_idx, block_end_idx, NUM_CODING)
|
||||||
|
.len();
|
||||||
|
|
||||||
// if we're not missing data, or if we have too much missin but have enough coding
|
// if we're not missing data, or if we have too much missing but have enough coding
|
||||||
if data_missing == 0 {
|
if data_missing == 0 {
|
||||||
// nothing to do...
|
// nothing to do...
|
||||||
return Ok(());
|
return Ok((vec![], vec![]));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (data_missing + coding_missing) > NUM_CODING {
|
if (data_missing + coding_missing) > NUM_CODING {
|
||||||
trace!(
|
trace!(
|
||||||
"recover {}: start: {} skipping recovery data: {} coding: {}",
|
"recover: start: {} skipping recovery data: {} coding: {}",
|
||||||
id,
|
block_start_idx,
|
||||||
block_start,
|
|
||||||
data_missing,
|
data_missing,
|
||||||
coding_missing
|
coding_missing
|
||||||
);
|
);
|
||||||
// nothing to do...
|
// nothing to do...
|
||||||
return Err(ErasureError::NotEnoughBlocksToDecode);
|
return Err(Error::ErasureError(ErasureError::NotEnoughBlocksToDecode));
|
||||||
}
|
}
|
||||||
|
|
||||||
trace!(
|
trace!(
|
||||||
"recover {}: recovering: data: {} coding: {}",
|
"recover: recovering: data: {} coding: {}",
|
||||||
id,
|
|
||||||
data_missing,
|
data_missing,
|
||||||
coding_missing
|
coding_missing
|
||||||
);
|
);
|
||||||
|
|
||||||
let mut blobs: Vec<SharedBlob> = Vec::with_capacity(NUM_DATA + NUM_CODING);
|
let mut blobs: Vec<SharedBlob> = Vec::with_capacity(NUM_DATA + NUM_CODING);
|
||||||
let mut locks = Vec::with_capacity(NUM_DATA + NUM_CODING);
|
|
||||||
let mut erasures: Vec<i32> = Vec::with_capacity(NUM_CODING);
|
let mut erasures: Vec<i32> = Vec::with_capacity(NUM_CODING);
|
||||||
let mut meta = None;
|
|
||||||
|
let mut missing_data: Vec<SharedBlob> = vec![];
|
||||||
|
let mut missing_coding: Vec<SharedBlob> = vec![];
|
||||||
let mut size = None;
|
let mut size = None;
|
||||||
|
|
||||||
// add the data blobs we have into recovery blob vector
|
// Add the data blobs we have into the recovery vector, mark the missing ones
|
||||||
for i in block_start..block_end {
|
for i in block_start_idx..block_end_idx {
|
||||||
let j = i % window.len();
|
let result = db_ledger
|
||||||
|
.data_cf
|
||||||
|
.get_by_slot_index(&db_ledger.db, slot, i)?;
|
||||||
|
|
||||||
if let Some(b) = window[j].data.clone() {
|
categorize_blob(
|
||||||
if meta.is_none() {
|
&result,
|
||||||
meta = Some(b.read().unwrap().meta.clone());
|
&mut blobs,
|
||||||
trace!("recover {} meta at {} {:?}", id, j, meta);
|
&mut missing_data,
|
||||||
}
|
&mut erasures,
|
||||||
blobs.push(b);
|
(i - block_start_idx) as i32,
|
||||||
} else {
|
)?;
|
||||||
let n = SharedBlob::default();
|
|
||||||
window[j].data = Some(n.clone());
|
|
||||||
// mark the missing memory
|
|
||||||
blobs.push(n);
|
|
||||||
erasures.push((i - block_start) as i32);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
for i in coding_start..block_end {
|
|
||||||
let j = i % window.len();
|
// Add the coding blobs we have into the recovery vector, mark the missing ones
|
||||||
if let Some(b) = window[j].coding.clone() {
|
for i in coding_start_idx..block_end_idx {
|
||||||
|
let result = db_ledger
|
||||||
|
.erasure_cf
|
||||||
|
.get_by_slot_index(&db_ledger.db, slot, i)?;
|
||||||
|
|
||||||
|
categorize_blob(
|
||||||
|
&result,
|
||||||
|
&mut blobs,
|
||||||
|
&mut missing_coding,
|
||||||
|
&mut erasures,
|
||||||
|
((i - coding_start_idx) + NUM_DATA as u64) as i32,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
if let Some(b) = result {
|
||||||
if size.is_none() {
|
if size.is_none() {
|
||||||
size = Some(b.read().unwrap().meta.size - BLOB_HEADER_SIZE);
|
size = Some(b.len() - BLOB_HEADER_SIZE);
|
||||||
trace!(
|
|
||||||
"{} recover size {} from {}",
|
|
||||||
id,
|
|
||||||
size.unwrap(),
|
|
||||||
i as u64 + block_start_idx
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
blobs.push(b);
|
|
||||||
} else {
|
|
||||||
let n = SharedBlob::default();
|
|
||||||
window[j].coding = Some(n.clone());
|
|
||||||
//mark the missing memory
|
|
||||||
blobs.push(n);
|
|
||||||
erasures.push(((i - coding_start) + NUM_DATA) as i32);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// now that we have size (from coding), zero out data blob tails
|
// Due to check (data_missing + coding_missing) > NUM_CODING from earlier in this function,
|
||||||
|
// we know at least one coding block must exist, so "size" will not remain None after the
|
||||||
|
// below processing.
|
||||||
let size = size.unwrap();
|
let size = size.unwrap();
|
||||||
for i in block_start..block_end {
|
|
||||||
let j = i % window.len();
|
|
||||||
|
|
||||||
if let Some(b) = &window[j].data {
|
|
||||||
let mut b_wl = b.write().unwrap();
|
|
||||||
for i in b_wl.meta.size..size {
|
|
||||||
b_wl.data[i] = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// marks end of erasures
|
// marks end of erasures
|
||||||
erasures.push(-1);
|
erasures.push(-1);
|
||||||
trace!("erasures[]: {} {:?} data_size: {}", id, erasures, size,);
|
trace!("erasures[]:{:?} data_size: {}", erasures, size,);
|
||||||
//lock everything for write
|
|
||||||
for b in &blobs {
|
|
||||||
locks.push(b.write().unwrap());
|
|
||||||
}
|
|
||||||
|
|
||||||
|
let mut locks = Vec::with_capacity(NUM_DATA + NUM_CODING);
|
||||||
{
|
{
|
||||||
let mut coding_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_CODING);
|
let mut coding_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_CODING);
|
||||||
let mut data_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_DATA);
|
let mut data_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_DATA);
|
||||||
|
|
||||||
|
for b in &blobs {
|
||||||
|
locks.push(b.write().unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
for (i, l) in locks.iter_mut().enumerate() {
|
for (i, l) in locks.iter_mut().enumerate() {
|
||||||
if i < NUM_DATA {
|
if i < NUM_DATA {
|
||||||
trace!("{} pushing data: {}", id, i);
|
|
||||||
data_ptrs.push(&mut l.data[..size]);
|
data_ptrs.push(&mut l.data[..size]);
|
||||||
} else {
|
} else {
|
||||||
trace!("{} pushing coding: {}", id, i);
|
|
||||||
coding_ptrs.push(&mut l.data_mut()[..size]);
|
coding_ptrs.push(&mut l.data_mut()[..size]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
trace!(
|
|
||||||
"{} coding_ptrs.len: {} data_ptrs.len {}",
|
// Decode the blocks
|
||||||
id,
|
|
||||||
coding_ptrs.len(),
|
|
||||||
data_ptrs.len()
|
|
||||||
);
|
|
||||||
decode_blocks(
|
decode_blocks(
|
||||||
data_ptrs.as_mut_slice(),
|
data_ptrs.as_mut_slice(),
|
||||||
coding_ptrs.as_mut_slice(),
|
coding_ptrs.as_mut_slice(),
|
||||||
@ -547,9 +470,9 @@ pub fn recover(id: &Pubkey, window: &mut [WindowSlot], start_idx: u64, start: us
|
|||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let meta = meta.unwrap();
|
// Create the missing blobs from the reconstructed data
|
||||||
let mut corrupt = false;
|
let mut corrupt = false;
|
||||||
// repopulate header data size from recovered blob contents
|
|
||||||
for i in &erasures[..erasures.len() - 1] {
|
for i in &erasures[..erasures.len() - 1] {
|
||||||
let n = *i as usize;
|
let n = *i as usize;
|
||||||
let mut idx = n as u64 + block_start_idx;
|
let mut idx = n as u64 + block_start_idx;
|
||||||
@ -559,48 +482,83 @@ pub fn recover(id: &Pubkey, window: &mut [WindowSlot], start_idx: u64, start: us
|
|||||||
data_size = locks[n].data_size().unwrap() as usize;
|
data_size = locks[n].data_size().unwrap() as usize;
|
||||||
data_size -= BLOB_HEADER_SIZE;
|
data_size -= BLOB_HEADER_SIZE;
|
||||||
if data_size > BLOB_DATA_SIZE {
|
if data_size > BLOB_DATA_SIZE {
|
||||||
error!("{} corrupt data blob[{}] data_size: {}", id, idx, data_size);
|
error!("corrupt data blob[{}] data_size: {}", idx, data_size);
|
||||||
corrupt = true;
|
corrupt = true;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
data_size = size;
|
data_size = size;
|
||||||
idx -= NUM_CODING as u64;
|
idx -= NUM_CODING as u64;
|
||||||
|
locks[n].set_slot(slot).unwrap();
|
||||||
locks[n].set_index(idx).unwrap();
|
locks[n].set_index(idx).unwrap();
|
||||||
|
|
||||||
if data_size - BLOB_HEADER_SIZE > BLOB_DATA_SIZE {
|
if data_size - BLOB_HEADER_SIZE > BLOB_DATA_SIZE {
|
||||||
error!(
|
error!("corrupt coding blob[{}] data_size: {}", idx, data_size);
|
||||||
"{} corrupt coding blob[{}] data_size: {}",
|
|
||||||
id, idx, data_size
|
|
||||||
);
|
|
||||||
corrupt = true;
|
corrupt = true;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
locks[n].meta = meta.clone();
|
|
||||||
locks[n].set_size(data_size);
|
locks[n].set_size(data_size);
|
||||||
trace!(
|
trace!(
|
||||||
"{} erasures[{}] ({}) size: {} data[0]: {}",
|
"erasures[{}] ({}) size: {} data[0]: {}",
|
||||||
id,
|
|
||||||
*i,
|
*i,
|
||||||
idx,
|
idx,
|
||||||
data_size,
|
data_size,
|
||||||
locks[n].data()[0]
|
locks[n].data()[0]
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
assert!(!corrupt, " {} ", id);
|
|
||||||
|
if corrupt {
|
||||||
|
// Remove the corrupted coding blobs so there's no effort wasted in trying to reconstruct
|
||||||
|
// the blobs again
|
||||||
|
for i in coding_start_idx..block_end_idx {
|
||||||
|
db_ledger
|
||||||
|
.erasure_cf
|
||||||
|
.delete_by_slot_index(&db_ledger.db, slot, i)?;
|
||||||
|
}
|
||||||
|
return Ok((vec![], vec![]));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok((missing_data, missing_coding))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn categorize_blob(
|
||||||
|
get_blob_result: &Option<Vec<u8>>,
|
||||||
|
blobs: &mut Vec<SharedBlob>,
|
||||||
|
missing: &mut Vec<SharedBlob>,
|
||||||
|
erasures: &mut Vec<i32>,
|
||||||
|
erasure_index: i32,
|
||||||
|
) -> Result<()> {
|
||||||
|
match get_blob_result {
|
||||||
|
Some(b) => {
|
||||||
|
if b.len() <= BLOB_HEADER_SIZE || b.len() > BLOB_SIZE {
|
||||||
|
return Err(Error::ErasureError(ErasureError::InvalidBlobData));
|
||||||
|
}
|
||||||
|
blobs.push(Arc::new(RwLock::new(Blob::new(&b))));
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
// Mark the missing memory
|
||||||
|
erasures.push(erasure_index);
|
||||||
|
let b = SharedBlob::default();
|
||||||
|
blobs.push(b.clone());
|
||||||
|
missing.push(b);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
pub mod test {
|
||||||
use erasure;
|
use super::*;
|
||||||
|
use db_ledger::{DbLedger, DEFAULT_SLOT_HEIGHT};
|
||||||
|
use ledger::{get_tmp_ledger_path, make_tiny_test_entries, Block};
|
||||||
use logger;
|
use logger;
|
||||||
use packet::{index_blobs, SharedBlob, BLOB_DATA_SIZE, BLOB_HEADER_SIZE, BLOB_SIZE};
|
use packet::{index_blobs, SharedBlob, BLOB_DATA_SIZE, BLOB_SIZE};
|
||||||
use rand::{thread_rng, Rng};
|
use rand::{thread_rng, Rng};
|
||||||
use solana_sdk::pubkey::Pubkey;
|
use solana_sdk::pubkey::Pubkey;
|
||||||
use solana_sdk::signature::{Keypair, KeypairUtil};
|
use solana_sdk::signature::{Keypair, KeypairUtil};
|
||||||
// use std::sync::{Arc, RwLock};
|
|
||||||
use window::WindowSlot;
|
use window::WindowSlot;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -618,10 +576,8 @@ mod test {
|
|||||||
let v_slices: Vec<_> = vs.iter().map(|x| x.as_slice()).collect();
|
let v_slices: Vec<_> = vs.iter().map(|x| x.as_slice()).collect();
|
||||||
|
|
||||||
assert!(
|
assert!(
|
||||||
erasure::generate_coding_blocks(
|
generate_coding_blocks(coding_blocks_slices.as_mut_slice(), v_slices.as_slice(),)
|
||||||
coding_blocks_slices.as_mut_slice(),
|
.is_ok()
|
||||||
v_slices.as_slice(),
|
|
||||||
).is_ok()
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
trace!("coding blocks:");
|
trace!("coding blocks:");
|
||||||
@ -639,7 +595,7 @@ mod test {
|
|||||||
let mut v_slices: Vec<_> = vs.iter_mut().map(|x| x.as_mut_slice()).collect();
|
let mut v_slices: Vec<_> = vs.iter_mut().map(|x| x.as_mut_slice()).collect();
|
||||||
|
|
||||||
assert!(
|
assert!(
|
||||||
erasure::decode_blocks(
|
decode_blocks(
|
||||||
v_slices.as_mut_slice(),
|
v_slices.as_mut_slice(),
|
||||||
coding_blocks_slices.as_mut_slice(),
|
coding_blocks_slices.as_mut_slice(),
|
||||||
erasures.as_slice(),
|
erasures.as_slice(),
|
||||||
@ -654,45 +610,107 @@ mod test {
|
|||||||
assert_eq!(v_orig, vs[0]);
|
assert_eq!(v_orig, vs[0]);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn print_window(window: &[WindowSlot]) {
|
// TODO: Temprorary function used in tests to generate a database ledger
|
||||||
for (i, w) in window.iter().enumerate() {
|
// from the window (which is used to generate the erasure coding)
|
||||||
print!("window({:>w$}): ", i, w = 2);
|
// until we also transition generate_coding() and BroadcastStage to use RocksDb.
|
||||||
if w.data.is_some() {
|
// Github issue: https://github.com/solana-labs/solana/issues/1899.
|
||||||
let window_l1 = w.data.clone().unwrap();
|
pub fn generate_db_ledger_from_window(
|
||||||
let window_l2 = window_l1.read().unwrap();
|
ledger_path: &str,
|
||||||
print!(
|
window: &[WindowSlot],
|
||||||
"data index: {:?} meta.size: {} data: ",
|
slot_height: u64,
|
||||||
window_l2.index(),
|
use_random: bool,
|
||||||
window_l2.meta.size
|
) -> DbLedger {
|
||||||
);
|
let mut db_ledger =
|
||||||
for i in 0..64 {
|
DbLedger::open(ledger_path).expect("Expected to be able to open database ledger");
|
||||||
print!("{:>w$} ", window_l2.data()[i], w = 3);
|
for slot in window {
|
||||||
|
if let Some(ref data) = slot.data {
|
||||||
|
// If we're using gibberish blobs, skip validation checks and insert
|
||||||
|
// directly into the ledger
|
||||||
|
if use_random {
|
||||||
|
let data_l = data.read().unwrap();
|
||||||
|
db_ledger
|
||||||
|
.data_cf
|
||||||
|
.put_by_slot_index(
|
||||||
|
&db_ledger.db,
|
||||||
|
slot_height,
|
||||||
|
data_l.index().unwrap(),
|
||||||
|
&data_l.data[..data_l.data_size().unwrap() as usize],
|
||||||
|
).expect("Expected successful put into data column of ledger");
|
||||||
|
} else {
|
||||||
|
db_ledger
|
||||||
|
.write_shared_blobs(slot_height, vec![data].into_iter())
|
||||||
|
.unwrap();
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
print!("data null ");
|
|
||||||
}
|
}
|
||||||
println!();
|
|
||||||
print!("window({:>w$}): ", i, w = 2);
|
if let Some(ref coding) = slot.coding {
|
||||||
if w.coding.is_some() {
|
let coding_lock = coding.read().unwrap();
|
||||||
let window_l1 = w.coding.clone().unwrap();
|
|
||||||
let window_l2 = window_l1.read().unwrap();
|
let index = coding_lock
|
||||||
print!(
|
.index()
|
||||||
"coding index: {:?} meta.size: {} data: ",
|
.expect("Expected coding blob to have valid index");
|
||||||
window_l2.index(),
|
|
||||||
window_l2.meta.size
|
let data_size = coding_lock
|
||||||
);
|
.size()
|
||||||
for i in 0..8 {
|
.expect("Expected coding blob to have valid ata size");
|
||||||
print!("{:>w$} ", window_l2.data()[i], w = 3);
|
|
||||||
}
|
db_ledger
|
||||||
} else {
|
.erasure_cf
|
||||||
print!("coding null");
|
.put_by_slot_index(
|
||||||
|
&db_ledger.db,
|
||||||
|
slot_height,
|
||||||
|
index,
|
||||||
|
&coding_lock.data[..data_size as usize + BLOB_HEADER_SIZE],
|
||||||
|
).unwrap();
|
||||||
}
|
}
|
||||||
println!();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
db_ledger
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn setup_window_ledger(
|
||||||
|
offset: usize,
|
||||||
|
num_blobs: usize,
|
||||||
|
use_random_window: bool,
|
||||||
|
slot: u64,
|
||||||
|
) -> Vec<WindowSlot> {
|
||||||
|
// Generate a window
|
||||||
|
let mut window = {
|
||||||
|
if use_random_window {
|
||||||
|
generate_window(offset, num_blobs, slot)
|
||||||
|
} else {
|
||||||
|
generate_entry_window(offset, num_blobs)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
for slot in &window {
|
||||||
|
if let Some(blob) = &slot.data {
|
||||||
|
let blob_r = blob.read().unwrap();
|
||||||
|
assert!(!blob_r.is_coding());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Generate the coding blocks
|
||||||
|
let mut index = (NUM_DATA + 2) as u64;
|
||||||
|
assert!(
|
||||||
|
generate_coding(
|
||||||
|
&Pubkey::default(),
|
||||||
|
&mut window,
|
||||||
|
offset as u64,
|
||||||
|
num_blobs,
|
||||||
|
&mut index
|
||||||
|
).is_ok()
|
||||||
|
);
|
||||||
|
assert_eq!(index, (NUM_DATA - NUM_CODING) as u64);
|
||||||
|
|
||||||
|
// put junk in the tails, simulates re-used blobs
|
||||||
|
scramble_window_tails(&mut window, num_blobs);
|
||||||
|
|
||||||
|
window
|
||||||
}
|
}
|
||||||
|
|
||||||
const WINDOW_SIZE: usize = 64;
|
const WINDOW_SIZE: usize = 64;
|
||||||
fn generate_window(offset: usize, num_blobs: usize) -> Vec<WindowSlot> {
|
fn generate_window(offset: usize, num_blobs: usize, slot: u64) -> Vec<WindowSlot> {
|
||||||
let mut window = vec![
|
let mut window = vec![
|
||||||
WindowSlot {
|
WindowSlot {
|
||||||
data: None,
|
data: None,
|
||||||
@ -728,7 +746,7 @@ mod test {
|
|||||||
blobs.push(b_);
|
blobs.push(b_);
|
||||||
}
|
}
|
||||||
|
|
||||||
index_blobs(&blobs, &Keypair::new().pubkey(), offset as u64, 13);
|
index_blobs(&blobs, &Keypair::new().pubkey(), offset as u64, slot);
|
||||||
for b in blobs {
|
for b in blobs {
|
||||||
let idx = b.read().unwrap().index().unwrap() as usize % WINDOW_SIZE;
|
let idx = b.read().unwrap().index().unwrap() as usize % WINDOW_SIZE;
|
||||||
|
|
||||||
@ -737,6 +755,27 @@ mod test {
|
|||||||
window
|
window
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn generate_entry_window(offset: usize, num_blobs: usize) -> Vec<WindowSlot> {
|
||||||
|
let mut window = vec![
|
||||||
|
WindowSlot {
|
||||||
|
data: None,
|
||||||
|
coding: None,
|
||||||
|
leader_unknown: false,
|
||||||
|
};
|
||||||
|
WINDOW_SIZE
|
||||||
|
];
|
||||||
|
let entries = make_tiny_test_entries(num_blobs);
|
||||||
|
let blobs = entries.to_blobs();
|
||||||
|
|
||||||
|
index_blobs(&blobs, &Keypair::new().pubkey(), offset as u64, 13);
|
||||||
|
for b in blobs.into_iter() {
|
||||||
|
let idx = b.read().unwrap().index().unwrap() as usize % WINDOW_SIZE;
|
||||||
|
|
||||||
|
window[idx].data = Some(b);
|
||||||
|
}
|
||||||
|
window
|
||||||
|
}
|
||||||
|
|
||||||
fn scramble_window_tails(window: &mut [WindowSlot], num_blobs: usize) {
|
fn scramble_window_tails(window: &mut [WindowSlot], num_blobs: usize) {
|
||||||
for i in 0..num_blobs {
|
for i in 0..num_blobs {
|
||||||
if let Some(b) = &window[i].data {
|
if let Some(b) = &window[i].data {
|
||||||
@ -753,156 +792,122 @@ mod test {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Remove a data block, test for successful recovery
|
||||||
#[test]
|
#[test]
|
||||||
pub fn test_window_recover_basic() {
|
pub fn test_window_recover_basic() {
|
||||||
logger::setup();
|
logger::setup();
|
||||||
// Generate a window
|
|
||||||
|
// Setup the window
|
||||||
let offset = 0;
|
let offset = 0;
|
||||||
let num_blobs = erasure::NUM_DATA + 2;
|
let num_blobs = NUM_DATA + 2;
|
||||||
let mut window = generate_window(WINDOW_SIZE, num_blobs);
|
let mut window = setup_window_ledger(offset, num_blobs, true, DEFAULT_SLOT_HEIGHT);
|
||||||
|
|
||||||
for slot in &window {
|
|
||||||
if let Some(blob) = &slot.data {
|
|
||||||
let blob_r = blob.read().unwrap();
|
|
||||||
assert!(!blob_r.is_coding());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
println!("** after-gen-window:");
|
|
||||||
print_window(&window);
|
|
||||||
|
|
||||||
// Generate the coding blocks
|
|
||||||
let mut index = (erasure::NUM_DATA + 2) as u64;
|
|
||||||
let id = Pubkey::default();
|
|
||||||
assert!(
|
|
||||||
erasure::generate_coding(&id, &mut window, offset as u64, num_blobs, &mut index)
|
|
||||||
.is_ok()
|
|
||||||
);
|
|
||||||
assert_eq!(index, (erasure::NUM_DATA - erasure::NUM_CODING) as u64);
|
|
||||||
|
|
||||||
println!("** after-gen-coding:");
|
|
||||||
print_window(&window);
|
|
||||||
|
|
||||||
println!("** whack data block:");
|
println!("** whack data block:");
|
||||||
// test erasing a data block
|
// Test erasing a data block
|
||||||
let erase_offset = offset;
|
let erase_offset = offset % window.len();
|
||||||
|
|
||||||
// Create a hole in the window
|
// Create a hole in the window
|
||||||
let refwindow = window[erase_offset].data.clone();
|
let refwindow = window[erase_offset].data.clone();
|
||||||
window[erase_offset].data = None;
|
window[erase_offset].data = None;
|
||||||
print_window(&window);
|
|
||||||
|
|
||||||
// put junk in the tails, simulates re-used blobs
|
// Generate the db_ledger from the window
|
||||||
scramble_window_tails(&mut window, num_blobs);
|
let ledger_path = get_tmp_ledger_path("test_window_recover_basic");
|
||||||
|
let mut db_ledger =
|
||||||
|
generate_db_ledger_from_window(&ledger_path, &window, DEFAULT_SLOT_HEIGHT, true);
|
||||||
|
|
||||||
// Recover it from coding
|
// Recover it from coding
|
||||||
assert!(erasure::recover(&id, &mut window, (offset + WINDOW_SIZE) as u64, offset,).is_ok());
|
let (recovered_data, recovered_coding) = recover(&mut db_ledger, 0, offset as u64)
|
||||||
println!("** after-recover:");
|
.expect("Expected successful recovery of erased blobs");
|
||||||
print_window(&window);
|
|
||||||
|
|
||||||
|
assert!(recovered_coding.is_empty());
|
||||||
{
|
{
|
||||||
// Check the result, block is here to drop locks
|
// Check the result, block is here to drop locks
|
||||||
|
let recovered_blob = recovered_data
|
||||||
let window_l = window[erase_offset].data.clone().unwrap();
|
.first()
|
||||||
let window_l2 = window_l.read().unwrap();
|
.expect("Expected recovered data blob to exist");
|
||||||
let ref_l = refwindow.clone().unwrap();
|
let ref_l = refwindow.clone().unwrap();
|
||||||
let ref_l2 = ref_l.read().unwrap();
|
let ref_l2 = ref_l.read().unwrap();
|
||||||
|
let result = recovered_blob.read().unwrap();
|
||||||
|
|
||||||
assert_eq!(window_l2.meta.size, ref_l2.meta.size);
|
assert_eq!(result.size().unwrap(), ref_l2.size().unwrap());
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
window_l2.data[..window_l2.meta.size],
|
result.data[..ref_l2.data_size().unwrap() as usize],
|
||||||
ref_l2.data[..window_l2.meta.size]
|
ref_l2.data[..ref_l2.data_size().unwrap() as usize]
|
||||||
);
|
|
||||||
assert_eq!(window_l2.meta.addr, ref_l2.meta.addr);
|
|
||||||
assert_eq!(window_l2.meta.port, ref_l2.meta.port);
|
|
||||||
assert_eq!(window_l2.meta.v6, ref_l2.meta.v6);
|
|
||||||
assert_eq!(
|
|
||||||
window_l2.index().unwrap(),
|
|
||||||
(erase_offset + WINDOW_SIZE) as u64
|
|
||||||
);
|
);
|
||||||
|
assert_eq!(result.index().unwrap(), offset as u64);
|
||||||
|
assert_eq!(result.slot().unwrap(), DEFAULT_SLOT_HEIGHT as u64);
|
||||||
}
|
}
|
||||||
|
drop(db_ledger);
|
||||||
|
DbLedger::destroy(&ledger_path)
|
||||||
|
.expect("Expected successful destruction of database ledger");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove a data and coding block, test for successful recovery
|
||||||
|
#[test]
|
||||||
|
pub fn test_window_recover_basic2() {
|
||||||
|
logger::setup();
|
||||||
|
|
||||||
|
// Setup the window
|
||||||
|
let offset = 0;
|
||||||
|
let num_blobs = NUM_DATA + 2;
|
||||||
|
let mut window = setup_window_ledger(offset, num_blobs, true, DEFAULT_SLOT_HEIGHT);
|
||||||
|
|
||||||
println!("** whack coding block and data block");
|
println!("** whack coding block and data block");
|
||||||
// tests erasing a coding block and a data block
|
// Tests erasing a coding block and a data block
|
||||||
let erase_offset = offset + erasure::NUM_DATA - erasure::NUM_CODING;
|
let coding_start = offset - (offset % NUM_DATA) + (NUM_DATA - NUM_CODING);
|
||||||
|
let erase_offset = coding_start % window.len();
|
||||||
|
|
||||||
// Create a hole in the window
|
// Create a hole in the window
|
||||||
let refwindow = window[erase_offset].data.clone();
|
let refwindowdata = window[erase_offset].data.clone();
|
||||||
|
let refwindowcoding = window[erase_offset].coding.clone();
|
||||||
window[erase_offset].data = None;
|
window[erase_offset].data = None;
|
||||||
window[erase_offset].coding = None;
|
window[erase_offset].coding = None;
|
||||||
|
let ledger_path = get_tmp_ledger_path("test_window_recover_basic2");
|
||||||
print_window(&window);
|
let mut db_ledger =
|
||||||
|
generate_db_ledger_from_window(&ledger_path, &window, DEFAULT_SLOT_HEIGHT, true);
|
||||||
|
|
||||||
// Recover it from coding
|
// Recover it from coding
|
||||||
assert!(erasure::recover(&id, &mut window, (offset + WINDOW_SIZE) as u64, offset,).is_ok());
|
let (recovered_data, recovered_coding) = recover(&mut db_ledger, 0, offset as u64)
|
||||||
println!("** after-recover:");
|
.expect("Expected successful recovery of erased blobs");
|
||||||
print_window(&window);
|
|
||||||
|
|
||||||
{
|
{
|
||||||
// Check the result, block is here to drop locks
|
let recovered_data_blob = recovered_data
|
||||||
let window_l = window[erase_offset].data.clone().unwrap();
|
.first()
|
||||||
let window_l2 = window_l.read().unwrap();
|
.expect("Expected recovered data blob to exist");
|
||||||
let ref_l = refwindow.clone().unwrap();
|
|
||||||
|
let recovered_coding_blob = recovered_coding
|
||||||
|
.first()
|
||||||
|
.expect("Expected recovered coding blob to exist");
|
||||||
|
|
||||||
|
// Check the recovered data result
|
||||||
|
let ref_l = refwindowdata.clone().unwrap();
|
||||||
let ref_l2 = ref_l.read().unwrap();
|
let ref_l2 = ref_l.read().unwrap();
|
||||||
assert_eq!(window_l2.meta.size, ref_l2.meta.size);
|
let result = recovered_data_blob.read().unwrap();
|
||||||
|
|
||||||
|
assert_eq!(result.size().unwrap(), ref_l2.size().unwrap());
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
window_l2.data[..window_l2.meta.size],
|
result.data[..ref_l2.data_size().unwrap() as usize],
|
||||||
ref_l2.data[..window_l2.meta.size]
|
ref_l2.data[..ref_l2.data_size().unwrap() as usize]
|
||||||
);
|
);
|
||||||
assert_eq!(window_l2.meta.addr, ref_l2.meta.addr);
|
assert_eq!(result.index().unwrap(), coding_start as u64);
|
||||||
assert_eq!(window_l2.meta.port, ref_l2.meta.port);
|
assert_eq!(result.slot().unwrap(), DEFAULT_SLOT_HEIGHT as u64);
|
||||||
assert_eq!(window_l2.meta.v6, ref_l2.meta.v6);
|
|
||||||
assert_eq!(
|
|
||||||
window_l2.index().unwrap(),
|
|
||||||
(erase_offset + WINDOW_SIZE) as u64
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
println!("** make stale data block index");
|
// Check the recovered erasure result
|
||||||
// tests erasing a coding block
|
let ref_l = refwindowcoding.clone().unwrap();
|
||||||
let erase_offset = offset;
|
|
||||||
// Create a hole in the window by making the blob's index stale
|
|
||||||
let refwindow = window[offset].data.clone();
|
|
||||||
if let Some(blob) = &window[erase_offset].data {
|
|
||||||
blob.write()
|
|
||||||
.unwrap()
|
|
||||||
.set_index(erase_offset as u64)
|
|
||||||
.unwrap(); // this also writes to refwindow...
|
|
||||||
}
|
|
||||||
print_window(&window);
|
|
||||||
|
|
||||||
// Recover it from coding
|
|
||||||
assert!(erasure::recover(&id, &mut window, (offset + WINDOW_SIZE) as u64, offset,).is_ok());
|
|
||||||
println!("** after-recover:");
|
|
||||||
print_window(&window);
|
|
||||||
|
|
||||||
// fix refwindow, we wrote to it above...
|
|
||||||
if let Some(blob) = &refwindow {
|
|
||||||
blob.write()
|
|
||||||
.unwrap()
|
|
||||||
.set_index((erase_offset + WINDOW_SIZE) as u64)
|
|
||||||
.unwrap(); // this also writes to refwindow...
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
|
||||||
// Check the result, block is here to drop locks
|
|
||||||
let window_l = window[erase_offset].data.clone().unwrap();
|
|
||||||
let window_l2 = window_l.read().unwrap();
|
|
||||||
let ref_l = refwindow.clone().unwrap();
|
|
||||||
let ref_l2 = ref_l.read().unwrap();
|
let ref_l2 = ref_l.read().unwrap();
|
||||||
assert_eq!(window_l2.meta.size, ref_l2.meta.size);
|
let result = recovered_coding_blob.read().unwrap();
|
||||||
|
|
||||||
|
assert_eq!(result.size().unwrap(), ref_l2.size().unwrap());
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
window_l2.data[..window_l2.meta.size],
|
result.data()[..ref_l2.size().unwrap() as usize],
|
||||||
ref_l2.data[..window_l2.meta.size]
|
ref_l2.data()[..ref_l2.size().unwrap() as usize]
|
||||||
);
|
|
||||||
assert_eq!(window_l2.index().unwrap(), ref_l2.index().unwrap());
|
|
||||||
assert_eq!(window_l2.slot().unwrap(), ref_l2.slot().unwrap());
|
|
||||||
assert_eq!(window_l2.meta.addr, ref_l2.meta.addr);
|
|
||||||
assert_eq!(window_l2.meta.port, ref_l2.meta.port);
|
|
||||||
assert_eq!(window_l2.meta.v6, ref_l2.meta.v6);
|
|
||||||
assert_eq!(
|
|
||||||
window_l2.index().unwrap(),
|
|
||||||
(erase_offset + WINDOW_SIZE) as u64
|
|
||||||
);
|
);
|
||||||
|
assert_eq!(result.index().unwrap(), coding_start as u64);
|
||||||
|
assert_eq!(result.slot().unwrap(), DEFAULT_SLOT_HEIGHT as u64);
|
||||||
}
|
}
|
||||||
|
drop(db_ledger);
|
||||||
|
DbLedger::destroy(&ledger_path)
|
||||||
|
.expect("Expected successful destruction of database ledger");
|
||||||
}
|
}
|
||||||
|
|
||||||
// //TODO This needs to be reworked
|
// //TODO This needs to be reworked
|
||||||
@ -912,25 +917,25 @@ mod test {
|
|||||||
// logger::setup();
|
// logger::setup();
|
||||||
// let offset = 4;
|
// let offset = 4;
|
||||||
// let data_len = 16;
|
// let data_len = 16;
|
||||||
// let num_blobs = erasure::NUM_DATA + 2;
|
// let num_blobs = NUM_DATA + 2;
|
||||||
// let (mut window, blobs_len) = generate_window(data_len, offset, num_blobs);
|
// let (mut window, blobs_len) = generate_window(data_len, offset, num_blobs);
|
||||||
// println!("** after-gen:");
|
// println!("** after-gen:");
|
||||||
// print_window(&window);
|
// print_window(&window);
|
||||||
// assert!(erasure::generate_coding(&mut window, offset, blobs_len).is_ok());
|
// assert!(generate_coding(&mut window, offset, blobs_len).is_ok());
|
||||||
// println!("** after-coding:");
|
// println!("** after-coding:");
|
||||||
// print_window(&window);
|
// print_window(&window);
|
||||||
// let refwindow = window[offset + 1].clone();
|
// let refwindow = window[offset + 1].clone();
|
||||||
// window[offset + 1] = None;
|
// window[offset + 1] = None;
|
||||||
// window[offset + 2] = None;
|
// window[offset + 2] = None;
|
||||||
// window[offset + erasure::SET_SIZE + 3] = None;
|
// window[offset + SET_SIZE + 3] = None;
|
||||||
// window[offset + (2 * erasure::SET_SIZE) + 0] = None;
|
// window[offset + (2 * SET_SIZE) + 0] = None;
|
||||||
// window[offset + (2 * erasure::SET_SIZE) + 1] = None;
|
// window[offset + (2 * SET_SIZE) + 1] = None;
|
||||||
// window[offset + (2 * erasure::SET_SIZE) + 2] = None;
|
// window[offset + (2 * SET_SIZE) + 2] = None;
|
||||||
// let window_l0 = &(window[offset + (3 * erasure::SET_SIZE)]).clone().unwrap();
|
// let window_l0 = &(window[offset + (3 * SET_SIZE)]).clone().unwrap();
|
||||||
// window_l0.write().unwrap().data[0] = 55;
|
// window_l0.write().unwrap().data[0] = 55;
|
||||||
// println!("** after-nulling:");
|
// println!("** after-nulling:");
|
||||||
// print_window(&window);
|
// print_window(&window);
|
||||||
// assert!(erasure::recover(&mut window, offset, offset + blobs_len).is_ok());
|
// assert!(recover(&mut window, offset, offset + blobs_len).is_ok());
|
||||||
// println!("** after-restore:");
|
// println!("** after-restore:");
|
||||||
// print_window(&window);
|
// print_window(&window);
|
||||||
// let window_l = window[offset + 1].clone().unwrap();
|
// let window_l = window[offset + 1].clone().unwrap();
|
||||||
|
@ -14,6 +14,7 @@ use serde::Serialize;
|
|||||||
use solana_sdk::hash::Hash;
|
use solana_sdk::hash::Hash;
|
||||||
pub use solana_sdk::packet::PACKET_DATA_SIZE;
|
pub use solana_sdk::packet::PACKET_DATA_SIZE;
|
||||||
use solana_sdk::pubkey::Pubkey;
|
use solana_sdk::pubkey::Pubkey;
|
||||||
|
use std::cmp;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::mem::size_of;
|
use std::mem::size_of;
|
||||||
@ -274,6 +275,15 @@ pub const BLOB_FLAG_IS_CODING: u32 = 0x1;
|
|||||||
pub const BLOB_HEADER_SIZE: usize = align!(BLOB_SIZE_END, 64);
|
pub const BLOB_HEADER_SIZE: usize = align!(BLOB_SIZE_END, 64);
|
||||||
|
|
||||||
impl Blob {
|
impl Blob {
|
||||||
|
pub fn new(data: &[u8]) -> Self {
|
||||||
|
let mut blob = Self::default();
|
||||||
|
let data_len = cmp::min(data.len(), blob.data.len());
|
||||||
|
let bytes = &data[..data_len];
|
||||||
|
blob.data[..data_len].copy_from_slice(bytes);
|
||||||
|
blob.meta.size = blob.data_size().expect("Expected valid data size") as usize;
|
||||||
|
blob
|
||||||
|
}
|
||||||
|
|
||||||
pub fn slot(&self) -> Result<u64> {
|
pub fn slot(&self) -> Result<u64> {
|
||||||
let mut rdr = io::Cursor::new(&self.data[0..BLOB_SLOT_END]);
|
let mut rdr = io::Cursor::new(&self.data[0..BLOB_SLOT_END]);
|
||||||
let r = rdr.read_u64::<LittleEndian>()?;
|
let r = rdr.read_u64::<LittleEndian>()?;
|
||||||
|
@ -4,8 +4,6 @@ use cluster_info::ClusterInfo;
|
|||||||
use counter::Counter;
|
use counter::Counter;
|
||||||
use entry::reconstruct_entries_from_blobs;
|
use entry::reconstruct_entries_from_blobs;
|
||||||
use entry::Entry;
|
use entry::Entry;
|
||||||
#[cfg(feature = "erasure")]
|
|
||||||
use erasure;
|
|
||||||
use leader_scheduler::LeaderScheduler;
|
use leader_scheduler::LeaderScheduler;
|
||||||
use log::Level;
|
use log::Level;
|
||||||
use packet::SharedBlob;
|
use packet::SharedBlob;
|
||||||
@ -323,14 +321,6 @@ impl WindowUtil for Window {
|
|||||||
self[w].leader_unknown = leader_unknown;
|
self[w].leader_unknown = leader_unknown;
|
||||||
*pending_retransmits = true;
|
*pending_retransmits = true;
|
||||||
|
|
||||||
#[cfg(feature = "erasure")]
|
|
||||||
{
|
|
||||||
let window_size = self.window_size();
|
|
||||||
if erasure::recover(id, self, *consumed, (*consumed % window_size) as usize).is_err() {
|
|
||||||
trace!("{}: erasure::recover failed", id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// push all contiguous blobs into consumed queue, increment consumed
|
// push all contiguous blobs into consumed queue, increment consumed
|
||||||
loop {
|
loop {
|
||||||
let k = (*consumed % self.window_size()) as usize;
|
let k = (*consumed % self.window_size()) as usize;
|
||||||
|
Reference in New Issue
Block a user