Store another size in the data block so it is coded as well
This commit is contained in:
committed by
Greg Fitzgerald
parent
b845245614
commit
34834c5af9
@ -1,6 +1,6 @@
|
|||||||
// Support erasure coding
|
// Support erasure coding
|
||||||
|
|
||||||
use packet::{BlobRecycler, SharedBlob};
|
use packet::{BlobRecycler, SharedBlob, BLOB_HEADER_SIZE};
|
||||||
use std::result;
|
use std::result;
|
||||||
|
|
||||||
//TODO(sakridge) pick these values
|
//TODO(sakridge) pick these values
|
||||||
@ -219,9 +219,10 @@ pub fn generate_coding(window: &mut Vec<Option<SharedBlob>>, consumed: usize, nu
|
|||||||
}
|
}
|
||||||
data_locks.push(lck);
|
data_locks.push(lck);
|
||||||
}
|
}
|
||||||
|
trace!("max_data_size: {}", max_data_size);
|
||||||
for (i, l) in data_locks.iter_mut().enumerate() {
|
for (i, l) in data_locks.iter_mut().enumerate() {
|
||||||
trace!("i: {} data: {}", i, l.data[0]);
|
trace!("i: {} data: {}", i, l.data[0]);
|
||||||
data_ptrs.push(&l.data()[..max_data_size]);
|
data_ptrs.push(&l.data[..max_data_size]);
|
||||||
}
|
}
|
||||||
|
|
||||||
// generate coding ptr array
|
// generate coding ptr array
|
||||||
@ -234,7 +235,7 @@ pub fn generate_coding(window: &mut Vec<Option<SharedBlob>>, consumed: usize, nu
|
|||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
let w_l = window[n].clone().unwrap();
|
let w_l = window[n].clone().unwrap();
|
||||||
w_l.write().unwrap().meta.size = max_data_size;
|
w_l.write().unwrap().set_size(max_data_size);
|
||||||
if w_l.write().unwrap().set_coding().is_err() {
|
if w_l.write().unwrap().set_coding().is_err() {
|
||||||
return Err(ErasureError::EncodeError);
|
return Err(ErasureError::EncodeError);
|
||||||
}
|
}
|
||||||
@ -251,7 +252,7 @@ pub fn generate_coding(window: &mut Vec<Option<SharedBlob>>, consumed: usize, nu
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
for (i, l) in coding_locks.iter_mut().enumerate() {
|
for (i, l) in coding_locks.iter_mut().enumerate() {
|
||||||
trace!("i: {} coding: {}", i, l.data[0]);
|
trace!("i: {} coding: {} size: {}", i, l.data[0], max_data_size);
|
||||||
coding_ptrs.push(&mut l.data_mut()[..max_data_size]);
|
coding_ptrs.push(&mut l.data_mut()[..max_data_size]);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -314,13 +315,21 @@ pub fn recover(
|
|||||||
if (data_missing + coded_missing) <= MAX_MISSING {
|
if (data_missing + coded_missing) <= MAX_MISSING {
|
||||||
let mut blobs: Vec<SharedBlob> = Vec::new();
|
let mut blobs: Vec<SharedBlob> = Vec::new();
|
||||||
let mut locks = Vec::new();
|
let mut locks = Vec::new();
|
||||||
let mut data_ptrs: Vec<&mut [u8]> = Vec::new();
|
|
||||||
let mut coding_ptrs: Vec<&[u8]> = Vec::new();
|
|
||||||
let mut erasures: Vec<i32> = Vec::new();
|
let mut erasures: Vec<i32> = Vec::new();
|
||||||
|
let mut meta = None;
|
||||||
|
let mut size = None;
|
||||||
for i in block_start..coding_end {
|
for i in block_start..coding_end {
|
||||||
let j = i % window.len();
|
let j = i % window.len();
|
||||||
let mut b = &mut window[j];
|
let mut b = &mut window[j];
|
||||||
if b.is_some() {
|
if b.is_some() {
|
||||||
|
if i >= NUM_DATA && size.is_none() {
|
||||||
|
let bl = b.clone().unwrap();
|
||||||
|
size = Some(bl.read().unwrap().meta.size - BLOB_HEADER_SIZE);
|
||||||
|
}
|
||||||
|
if meta.is_none() {
|
||||||
|
let bl = b.clone().unwrap();
|
||||||
|
meta = Some(bl.read().unwrap().meta.clone());
|
||||||
|
}
|
||||||
blobs.push(b.clone().expect("'blobs' arr in pb fn recover"));
|
blobs.push(b.clone().expect("'blobs' arr in pb fn recover"));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@ -331,18 +340,21 @@ pub fn recover(
|
|||||||
erasures.push((i - block_start) as i32);
|
erasures.push((i - block_start) as i32);
|
||||||
}
|
}
|
||||||
erasures.push(-1);
|
erasures.push(-1);
|
||||||
trace!("erasures: {:?}", erasures);
|
trace!("erasures: {:?} data_size: {} header_size: {}", erasures, size.unwrap(), BLOB_HEADER_SIZE);
|
||||||
//lock everything
|
//lock everything
|
||||||
for b in &blobs {
|
for b in &blobs {
|
||||||
locks.push(b.write().expect("'locks' arr in pb fn recover"));
|
locks.push(b.write().expect("'locks' arr in pb fn recover"));
|
||||||
}
|
}
|
||||||
|
{
|
||||||
|
let mut coding_ptrs: Vec<&[u8]> = Vec::new();
|
||||||
|
let mut data_ptrs: Vec<&mut [u8]> = Vec::new();
|
||||||
for (i, l) in locks.iter_mut().enumerate() {
|
for (i, l) in locks.iter_mut().enumerate() {
|
||||||
if i >= NUM_DATA {
|
if i >= NUM_DATA {
|
||||||
trace!("pushing coding: {}", i);
|
trace!("pushing coding: {}", i);
|
||||||
coding_ptrs.push(&l.data);
|
coding_ptrs.push(&l.data()[..size.unwrap()]);
|
||||||
} else {
|
} else {
|
||||||
trace!("pushing data: {}", i);
|
trace!("pushing data: {}", i);
|
||||||
data_ptrs.push(&mut l.data);
|
data_ptrs.push(&mut l.data[..size.unwrap()]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
trace!(
|
trace!(
|
||||||
@ -352,6 +364,14 @@ pub fn recover(
|
|||||||
);
|
);
|
||||||
decode_blocks(data_ptrs.as_mut_slice(), &coding_ptrs, &erasures)?;
|
decode_blocks(data_ptrs.as_mut_slice(), &coding_ptrs, &erasures)?;
|
||||||
}
|
}
|
||||||
|
for i in &erasures[..erasures.len() - 1] {
|
||||||
|
let idx = *i as usize;
|
||||||
|
let data_size = locks[idx].get_data_size().unwrap() - BLOB_HEADER_SIZE as u64;
|
||||||
|
locks[idx].meta = meta.clone().unwrap();
|
||||||
|
locks[idx].set_size(data_size as usize);
|
||||||
|
trace!("erasures[{}] size: {} data[0]: {}", *i, data_size, locks[idx].data()[0]);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
block_start += NUM_CODED;
|
block_start += NUM_CODED;
|
||||||
}
|
}
|
||||||
@ -362,7 +382,7 @@ pub fn recover(
|
|||||||
mod test {
|
mod test {
|
||||||
use erasure;
|
use erasure;
|
||||||
use logger;
|
use logger;
|
||||||
use packet::{BlobRecycler, SharedBlob};
|
use packet::{BlobRecycler, SharedBlob, BLOB_HEADER_SIZE};
|
||||||
use crdt;
|
use crdt;
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
use signature::KeyPair;
|
use signature::KeyPair;
|
||||||
@ -447,7 +467,7 @@ mod test {
|
|||||||
let b = blob_recycler.allocate();
|
let b = blob_recycler.allocate();
|
||||||
let b_ = b.clone();
|
let b_ = b.clone();
|
||||||
let mut w = b.write().unwrap();
|
let mut w = b.write().unwrap();
|
||||||
w.meta.size = data_len;
|
w.set_size(data_len);
|
||||||
for k in 0..data_len {
|
for k in 0..data_len {
|
||||||
w.data_mut()[k] = (k + i) as u8;
|
w.data_mut()[k] = (k + i) as u8;
|
||||||
}
|
}
|
||||||
@ -503,13 +523,18 @@ mod test {
|
|||||||
|
|
||||||
// Check the result
|
// Check the result
|
||||||
let window_l = window[erase_offset].clone().unwrap();
|
let window_l = window[erase_offset].clone().unwrap();
|
||||||
|
let window_l2 = window_l.read().unwrap();
|
||||||
let ref_l = refwindow.clone().unwrap();
|
let ref_l = refwindow.clone().unwrap();
|
||||||
|
let ref_l2 = ref_l.read().unwrap();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
window_l.read().unwrap().data()[..data_len],
|
window_l2.data[..(data_len + BLOB_HEADER_SIZE)],
|
||||||
ref_l.read().unwrap().data()[..data_len]
|
ref_l2.data[..(data_len + BLOB_HEADER_SIZE)]
|
||||||
);
|
);
|
||||||
assert_eq!(window_l.read().unwrap().meta.size, data_len);
|
assert_eq!(window_l2.meta.size, ref_l2.meta.size);
|
||||||
assert_eq!(window_l.read().unwrap().get_index().unwrap(), erase_offset as u64);
|
assert_eq!(window_l2.meta.addr, ref_l2.meta.addr);
|
||||||
|
assert_eq!(window_l2.meta.port, ref_l2.meta.port);
|
||||||
|
assert_eq!(window_l2.meta.v6, ref_l2.meta.v6);
|
||||||
|
assert_eq!(window_l2.get_index().unwrap(), erase_offset as u64);
|
||||||
}
|
}
|
||||||
|
|
||||||
//TODO This needs to be reworked
|
//TODO This needs to be reworked
|
||||||
|
@ -272,8 +272,14 @@ pub fn to_blobs<T: Serialize>(
|
|||||||
const BLOB_INDEX_END: usize = size_of::<u64>();
|
const BLOB_INDEX_END: usize = size_of::<u64>();
|
||||||
const BLOB_ID_END: usize = BLOB_INDEX_END + size_of::<usize>() + size_of::<PublicKey>();
|
const BLOB_ID_END: usize = BLOB_INDEX_END + size_of::<usize>() + size_of::<PublicKey>();
|
||||||
const BLOB_FLAGS_END: usize = BLOB_ID_END + size_of::<u32>();
|
const BLOB_FLAGS_END: usize = BLOB_ID_END + size_of::<u32>();
|
||||||
|
const BLOB_SIZE_END: usize = BLOB_FLAGS_END + size_of::<u64>();
|
||||||
|
|
||||||
|
macro_rules! align {
|
||||||
|
($x:expr, $align: expr) => ($x + ($align - 1) & !($align - 1));
|
||||||
|
}
|
||||||
|
|
||||||
pub const BLOB_FLAG_IS_CODING: u32 = 0x1;
|
pub const BLOB_FLAG_IS_CODING: u32 = 0x1;
|
||||||
|
pub const BLOB_HEADER_SIZE: usize = align!(BLOB_SIZE_END, 64);
|
||||||
|
|
||||||
impl Blob {
|
impl Blob {
|
||||||
pub fn get_index(&self) -> Result<u64> {
|
pub fn get_index(&self) -> Result<u64> {
|
||||||
@ -322,14 +328,29 @@ impl Blob {
|
|||||||
self.set_flags(flags | BLOB_FLAG_IS_CODING)
|
self.set_flags(flags | BLOB_FLAG_IS_CODING)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn get_data_size(&self) -> Result<u64> {
|
||||||
|
let mut rdr = io::Cursor::new(&self.data[BLOB_FLAGS_END..BLOB_SIZE_END]);
|
||||||
|
let r = rdr.read_u64::<LittleEndian>()?;
|
||||||
|
Ok(r)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_data_size(&mut self, ix: u64) -> Result<()> {
|
||||||
|
let mut wtr = vec![];
|
||||||
|
wtr.write_u64::<LittleEndian>(ix)?;
|
||||||
|
self.data[BLOB_FLAGS_END..BLOB_SIZE_END].clone_from_slice(&wtr);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub fn data(&self) -> &[u8] {
|
pub fn data(&self) -> &[u8] {
|
||||||
&self.data[BLOB_FLAGS_END..]
|
&self.data[BLOB_HEADER_SIZE..]
|
||||||
}
|
}
|
||||||
pub fn data_mut(&mut self) -> &mut [u8] {
|
pub fn data_mut(&mut self) -> &mut [u8] {
|
||||||
&mut self.data[BLOB_FLAGS_END..]
|
&mut self.data[BLOB_HEADER_SIZE..]
|
||||||
}
|
}
|
||||||
pub fn set_size(&mut self, size: usize) {
|
pub fn set_size(&mut self, size: usize) {
|
||||||
self.meta.size = size + BLOB_FLAGS_END;
|
let new_size = size + BLOB_HEADER_SIZE;
|
||||||
|
self.meta.size = new_size;
|
||||||
|
self.set_data_size(new_size as u64).unwrap();
|
||||||
}
|
}
|
||||||
pub fn recv_from(re: &BlobRecycler, socket: &UdpSocket) -> Result<VecDeque<SharedBlob>> {
|
pub fn recv_from(re: &BlobRecycler, socket: &UdpSocket) -> Result<VecDeque<SharedBlob>> {
|
||||||
let mut v = VecDeque::new();
|
let mut v = VecDeque::new();
|
||||||
|
Reference in New Issue
Block a user