From 34834c5af9226b74f573b40315c0a24a6a08e33c Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Mon, 4 Jun 2018 11:43:23 -0700 Subject: [PATCH] Store another size in the data block so it is coded as well --- src/erasure.rs | 77 +++++++++++++++++++++++++++++++++----------------- src/packet.rs | 27 ++++++++++++++++-- 2 files changed, 75 insertions(+), 29 deletions(-) diff --git a/src/erasure.rs b/src/erasure.rs index 6837df14fa..ccdba609d3 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -1,6 +1,6 @@ // Support erasure coding -use packet::{BlobRecycler, SharedBlob}; +use packet::{BlobRecycler, SharedBlob, BLOB_HEADER_SIZE}; use std::result; //TODO(sakridge) pick these values @@ -219,9 +219,10 @@ pub fn generate_coding(window: &mut Vec>, consumed: usize, nu } data_locks.push(lck); } + trace!("max_data_size: {}", max_data_size); for (i, l) in data_locks.iter_mut().enumerate() { trace!("i: {} data: {}", i, l.data[0]); - data_ptrs.push(&l.data()[..max_data_size]); + data_ptrs.push(&l.data[..max_data_size]); } // generate coding ptr array @@ -234,7 +235,7 @@ pub fn generate_coding(window: &mut Vec>, consumed: usize, nu return Ok(()); } let w_l = window[n].clone().unwrap(); - w_l.write().unwrap().meta.size = max_data_size; + w_l.write().unwrap().set_size(max_data_size); if w_l.write().unwrap().set_coding().is_err() { return Err(ErasureError::EncodeError); } @@ -251,7 +252,7 @@ pub fn generate_coding(window: &mut Vec>, consumed: usize, nu ); } for (i, l) in coding_locks.iter_mut().enumerate() { - trace!("i: {} coding: {}", i, l.data[0]); + trace!("i: {} coding: {} size: {}", i, l.data[0], max_data_size); coding_ptrs.push(&mut l.data_mut()[..max_data_size]); } @@ -314,13 +315,21 @@ pub fn recover( if (data_missing + coded_missing) <= MAX_MISSING { let mut blobs: Vec = Vec::new(); let mut locks = Vec::new(); - let mut data_ptrs: Vec<&mut [u8]> = Vec::new(); - let mut coding_ptrs: Vec<&[u8]> = Vec::new(); let mut erasures: Vec = Vec::new(); + let mut meta = None; + let mut size = None; for i in block_start..coding_end { let j = i % window.len(); let mut b = &mut window[j]; if b.is_some() { + if i >= NUM_DATA && size.is_none() { + let bl = b.clone().unwrap(); + size = Some(bl.read().unwrap().meta.size - BLOB_HEADER_SIZE); + } + if meta.is_none() { + let bl = b.clone().unwrap(); + meta = Some(bl.read().unwrap().meta.clone()); + } blobs.push(b.clone().expect("'blobs' arr in pb fn recover")); continue; } @@ -331,26 +340,37 @@ pub fn recover( erasures.push((i - block_start) as i32); } erasures.push(-1); - trace!("erasures: {:?}", erasures); + trace!("erasures: {:?} data_size: {} header_size: {}", erasures, size.unwrap(), BLOB_HEADER_SIZE); //lock everything for b in &blobs { locks.push(b.write().expect("'locks' arr in pb fn recover")); } - for (i, l) in locks.iter_mut().enumerate() { - if i >= NUM_DATA { - trace!("pushing coding: {}", i); - coding_ptrs.push(&l.data); - } else { - trace!("pushing data: {}", i); - data_ptrs.push(&mut l.data); + { + let mut coding_ptrs: Vec<&[u8]> = Vec::new(); + let mut data_ptrs: Vec<&mut [u8]> = Vec::new(); + for (i, l) in locks.iter_mut().enumerate() { + if i >= NUM_DATA { + trace!("pushing coding: {}", i); + coding_ptrs.push(&l.data()[..size.unwrap()]); + } else { + trace!("pushing data: {}", i); + data_ptrs.push(&mut l.data[..size.unwrap()]); + } } + trace!( + "coding_ptrs.len: {} data_ptrs.len {}", + coding_ptrs.len(), + data_ptrs.len() + ); + decode_blocks(data_ptrs.as_mut_slice(), &coding_ptrs, &erasures)?; + } + for i in &erasures[..erasures.len() - 1] { + let idx = *i as usize; + let data_size = locks[idx].get_data_size().unwrap() - BLOB_HEADER_SIZE as u64; + locks[idx].meta = meta.clone().unwrap(); + locks[idx].set_size(data_size as usize); + trace!("erasures[{}] size: {} data[0]: {}", *i, data_size, locks[idx].data()[0]); } - trace!( - "coding_ptrs.len: {} data_ptrs.len {}", - coding_ptrs.len(), - data_ptrs.len() - ); - decode_blocks(data_ptrs.as_mut_slice(), &coding_ptrs, &erasures)?; } } block_start += NUM_CODED; @@ -362,7 +382,7 @@ pub fn recover( mod test { use erasure; use logger; - use packet::{BlobRecycler, SharedBlob}; + use packet::{BlobRecycler, SharedBlob, BLOB_HEADER_SIZE}; use crdt; use std::sync::{Arc, RwLock}; use signature::KeyPair; @@ -447,7 +467,7 @@ mod test { let b = blob_recycler.allocate(); let b_ = b.clone(); let mut w = b.write().unwrap(); - w.meta.size = data_len; + w.set_size(data_len); for k in 0..data_len { w.data_mut()[k] = (k + i) as u8; } @@ -503,13 +523,18 @@ mod test { // Check the result let window_l = window[erase_offset].clone().unwrap(); + let window_l2 = window_l.read().unwrap(); let ref_l = refwindow.clone().unwrap(); + let ref_l2 = ref_l.read().unwrap(); assert_eq!( - window_l.read().unwrap().data()[..data_len], - ref_l.read().unwrap().data()[..data_len] + window_l2.data[..(data_len + BLOB_HEADER_SIZE)], + ref_l2.data[..(data_len + BLOB_HEADER_SIZE)] ); - assert_eq!(window_l.read().unwrap().meta.size, data_len); - assert_eq!(window_l.read().unwrap().get_index().unwrap(), erase_offset as u64); + assert_eq!(window_l2.meta.size, ref_l2.meta.size); + assert_eq!(window_l2.meta.addr, ref_l2.meta.addr); + assert_eq!(window_l2.meta.port, ref_l2.meta.port); + assert_eq!(window_l2.meta.v6, ref_l2.meta.v6); + assert_eq!(window_l2.get_index().unwrap(), erase_offset as u64); } //TODO This needs to be reworked diff --git a/src/packet.rs b/src/packet.rs index 3df9f3c352..2241305c86 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -272,8 +272,14 @@ pub fn to_blobs( const BLOB_INDEX_END: usize = size_of::(); const BLOB_ID_END: usize = BLOB_INDEX_END + size_of::() + size_of::(); const BLOB_FLAGS_END: usize = BLOB_ID_END + size_of::(); +const BLOB_SIZE_END: usize = BLOB_FLAGS_END + size_of::(); + +macro_rules! align { + ($x:expr, $align: expr) => ($x + ($align - 1) & !($align - 1)); +} pub const BLOB_FLAG_IS_CODING: u32 = 0x1; +pub const BLOB_HEADER_SIZE: usize = align!(BLOB_SIZE_END, 64); impl Blob { pub fn get_index(&self) -> Result { @@ -322,14 +328,29 @@ impl Blob { self.set_flags(flags | BLOB_FLAG_IS_CODING) } + pub fn get_data_size(&self) -> Result { + let mut rdr = io::Cursor::new(&self.data[BLOB_FLAGS_END..BLOB_SIZE_END]); + let r = rdr.read_u64::()?; + Ok(r) + } + + pub fn set_data_size(&mut self, ix: u64) -> Result<()> { + let mut wtr = vec![]; + wtr.write_u64::(ix)?; + self.data[BLOB_FLAGS_END..BLOB_SIZE_END].clone_from_slice(&wtr); + Ok(()) + } + pub fn data(&self) -> &[u8] { - &self.data[BLOB_FLAGS_END..] + &self.data[BLOB_HEADER_SIZE..] } pub fn data_mut(&mut self) -> &mut [u8] { - &mut self.data[BLOB_FLAGS_END..] + &mut self.data[BLOB_HEADER_SIZE..] } pub fn set_size(&mut self, size: usize) { - self.meta.size = size + BLOB_FLAGS_END; + let new_size = size + BLOB_HEADER_SIZE; + self.meta.size = new_size; + self.set_data_size(new_size as u64).unwrap(); } pub fn recv_from(re: &BlobRecycler, socket: &UdpSocket) -> Result> { let mut v = VecDeque::new();