diff --git a/src/erasure.rs b/src/erasure.rs index ad922e301c..782592f27a 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -1,11 +1,11 @@ // Support erasure coding -use packet::{BlobRecycler, SharedBlob}; +use packet::{BlobRecycler, SharedBlob, BLOB_FLAG_IS_CODING}; use std::result; //TODO(sakridge) pick these values -const NUM_CODED: usize = 10; -const MAX_MISSING: usize = 2; +const NUM_CODED: usize = 3; +const MAX_MISSING: usize = 1; const NUM_DATA: usize = NUM_CODED - MAX_MISSING; #[derive(Debug, PartialEq, Eq)] @@ -73,12 +73,14 @@ pub fn generate_coding_blocks(coding: &mut [&mut [u8]], data: &[&[u8]]) -> Resul let mut data_arg = Vec::new(); for block in data { if block_len != block.len() { + trace!("data block size incorrect {} expected {}", block.len(), block_len); return Err(ErasureError::InvalidBlockSize); } data_arg.push(block.as_ptr()); } for mut block in coding { if block_len != block.len() { + trace!("coding block size incorrect {} expected {}", block.len(), block_len); return Err(ErasureError::InvalidBlockSize); } coding_arg.push(block.as_mut_ptr()); @@ -153,8 +155,8 @@ pub fn decode_blocks(data: &mut [&mut [u8]], coding: &[&[u8]], erasures: &[i32]) // Allocate some coding blobs and insert into the blobs array pub fn add_coding_blobs(recycler: &BlobRecycler, blobs: &mut Vec, consumed: u64) { let num_data_segments = blobs.len() / NUM_DATA; - trace!( - "num_data: {} blobs.len(): {}", + info!( + "add_coding num_data: {} blobs.len(): {}", num_data_segments, blobs.len() ); @@ -164,6 +166,10 @@ pub fn add_coding_blobs(recycler: &BlobRecycler, blobs: &mut Vec, co trace!("putting coding at {}", j); if j <= blobs.len() { let new_blob = recycler.allocate(); + let new_blob_clone = new_blob.clone(); + let mut new_blob_l = new_blob_clone.write().unwrap(); + new_blob_l.meta.size = new_blob_l.data().len(); + drop(new_blob_l); blobs.insert(j, new_blob); } } @@ -180,8 +186,8 @@ pub fn generate_coding(window: &mut Vec>, consumed: usize) -> let mut coding_ptrs: Vec<&mut [u8]> = Vec::new(); let block_start = consumed - (consumed % NUM_CODED); - trace!( - "generate start: {} end: {}", + info!( + "generate_coding start: {} end: {}", block_start, block_start + NUM_DATA ); @@ -198,12 +204,17 @@ pub fn generate_coding(window: &mut Vec>, consumed: usize) -> .expect("'data_blobs' arr in pub fn generate_coding"), ); } + let mut max_data_size = 0; for b in &data_blobs { - data_locks.push(b.write().expect("'b' write lock in pub fn generate_coding")); + let lck = b.write().expect("'b' write lock in pub fn generate_coding"); + if lck.meta.size > max_data_size { + max_data_size = lck.meta.size; + } + data_locks.push(lck); } for (i, l) in data_locks.iter_mut().enumerate() { trace!("i: {} data: {}", i, l.data[0]); - data_ptrs.push(&l.data); + data_ptrs.push(&l.data()[..max_data_size]); } // generate coding ptr array @@ -215,6 +226,10 @@ pub fn generate_coding(window: &mut Vec>, consumed: usize) -> trace!("coding block is null @ {}", n); return Ok(()); } + let w_l = window[n].clone().unwrap(); + w_l.write().unwrap().meta.size = max_data_size; + let flags = w_l.write().unwrap().get_flags().unwrap(); + w_l.write().unwrap().set_flags(flags | BLOB_FLAG_IS_CODING); coding_blobs.push( window[n] .clone() @@ -229,7 +244,7 @@ pub fn generate_coding(window: &mut Vec>, consumed: usize) -> } for (i, l) in coding_locks.iter_mut().enumerate() { trace!("i: {} coding: {}", i, l.data[0]); - coding_ptrs.push(&mut l.data); + coding_ptrs.push(&mut l.data_mut()[..max_data_size]); } generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?; @@ -252,12 +267,12 @@ pub fn recover( let block_start = consumed - (consumed % NUM_CODED); let coding_start = block_start + NUM_DATA; let coding_end = block_start + NUM_CODED; - trace!( - "block_start: {} coding_start: {} coding_end: {}", + /*info!( + "recover: block_start: {} coding_start: {} coding_end: {}", block_start, coding_start, coding_end - ); + );*/ for i in block_start..coding_end { let n = i % window.len(); if window[n].is_none() { @@ -268,9 +283,9 @@ pub fn recover( } } } - trace!("missing: data: {} coding: {}", data_missing, coded_missing); if data_missing > 0 { if (data_missing + coded_missing) <= MAX_MISSING { + trace!("recovering: data: {} coding: {}", data_missing, coded_missing); let mut blobs: Vec = Vec::new(); let mut locks = Vec::new(); let mut data_ptrs: Vec<&mut [u8]> = Vec::new(); @@ -322,6 +337,10 @@ mod test { use erasure; use logger; use packet::{BlobRecycler, SharedBlob}; + use crdt; + use std::sync::{Arc, RwLock}; + use signature::KeyPair; + use signature::KeyPairUtil; #[test] pub fn test_coding() { @@ -377,10 +396,11 @@ mod test { for (i, w) in window.iter().enumerate() { print!("window({}): ", i); if w.is_some() { - let window_lock = w.clone().unwrap(); - let window_data = window_lock.read().unwrap().data; + let window_l1 = w.clone().unwrap(); + let window_l2 = window_l1.read().unwrap(); + print!("index: {:?} meta.size: {} data: ", window_l2.get_index(), window_l2.meta.size); for i in 0..8 { - print!("{} ", window_data[i]); + print!("{} ", window_l2.data()[i]); } } else { print!("null"); @@ -400,8 +420,6 @@ mod test { let b = blob_recycler.allocate(); let b_ = b.clone(); let mut w = b.write().unwrap(); - w.set_index(i as u64).unwrap(); - assert_eq!(i as u64, w.get_index().unwrap()); w.meta.size = data_len; for k in 0..data_len { w.data_mut()[k] = (k + i) as u8; @@ -409,8 +427,20 @@ mod test { blobs.push(b_); } erasure::add_coding_blobs(blob_recycler, &mut blobs, offset as u64); + + let d = crdt::ReplicatedData::new( + KeyPair::new().pubkey(), + "127.0.0.1:1234".parse().unwrap(), + "127.0.0.1:1235".parse().unwrap(), + "127.0.0.1:1236".parse().unwrap(), + "127.0.0.1:1237".parse().unwrap(), + ); + let crdt = Arc::new(RwLock::new(crdt::Crdt::new(d.clone()))); + + assert!(crdt::Crdt::index_blobs(&crdt, &blobs, &mut (offset as u64)).is_ok()); for (i, b) in blobs.into_iter().enumerate() { - window[i] = Some(b); + let idx = b.read().unwrap().get_index().unwrap() as usize; + window[idx] = Some(b); } window } @@ -422,7 +452,7 @@ mod test { let blob_recycler = BlobRecycler::default(); // Generate a window - let offset = 4; + let offset = 1; let mut window = generate_window(data_len, &blob_recycler, 0); println!("** after-gen-window:"); print_window(&window); @@ -432,9 +462,10 @@ mod test { println!("** after-gen-coding:"); print_window(&window); + let erase_offset = offset; // Create a hole in the window - let refwindow = window[offset + 1].clone(); - window[offset + 1] = None; + let refwindow = window[erase_offset].clone(); + window[erase_offset] = None; // Recover it from coding assert!(erasure::recover(&blob_recycler, &mut window, offset).is_ok()); @@ -442,12 +473,14 @@ mod test { print_window(&window); // Check the result - let window_l = window[offset + 1].clone().unwrap(); + let window_l = window[erase_offset].clone().unwrap(); let ref_l = refwindow.clone().unwrap(); assert_eq!( window_l.read().unwrap().data()[..data_len], ref_l.read().unwrap().data()[..data_len] ); + assert_eq!(window_l.read().unwrap().meta.size, data_len); + assert_eq!(window_l.read().unwrap().get_index().unwrap(), erase_offset as u64); } //TODO This needs to be reworked diff --git a/src/packet.rs b/src/packet.rs index 4441565ef4..d7d6b0020f 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -271,6 +271,9 @@ pub fn to_blobs( const BLOB_INDEX_END: usize = size_of::(); const BLOB_ID_END: usize = BLOB_INDEX_END + size_of::() + size_of::(); +const BLOB_FLAGS_END: usize = BLOB_ID_END + size_of::(); + +pub const BLOB_FLAG_IS_CODING: u32 = 0x1; impl Blob { pub fn get_index(&self) -> Result { @@ -297,14 +300,27 @@ impl Blob { Ok(()) } + pub fn get_flags(&self) -> Result { + let mut rdr = io::Cursor::new(&self.data[BLOB_ID_END..BLOB_FLAGS_END]); + let r = rdr.read_u32::()?; + Ok(r) + } + + pub fn set_flags(&mut self, ix: u32) -> Result<()> { + let mut wtr = vec![]; + wtr.write_u32::(ix)?; + self.data[BLOB_ID_END..BLOB_FLAGS_END].clone_from_slice(&wtr); + Ok(()) + } + pub fn data(&self) -> &[u8] { - &self.data[BLOB_ID_END..] + &self.data[BLOB_FLAGS_END..] } pub fn data_mut(&mut self) -> &mut [u8] { - &mut self.data[BLOB_ID_END..] + &mut self.data[BLOB_FLAGS_END..] } pub fn set_size(&mut self, size: usize) { - self.meta.size = size + BLOB_ID_END; + self.meta.size = size + BLOB_FLAGS_END; } pub fn recv_from(re: &BlobRecycler, socket: &UdpSocket) -> Result> { let mut v = VecDeque::new(); diff --git a/src/streamer.rs b/src/streamer.rs index 86ad3264c9..4600ec110d 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -3,7 +3,7 @@ use crdt::Crdt; #[cfg(feature = "erasure")] use erasure; -use packet::{Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedPackets, BLOB_SIZE}; +use packet::{Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedPackets, BLOB_SIZE, BLOB_FLAG_IS_CODING}; use result::{Error, Result}; use std::collections::VecDeque; use std::net::{SocketAddr, UdpSocket}; @@ -290,6 +290,11 @@ fn recv_window( if window[k].is_none() { break; } + let w_l1 = window[k].clone().unwrap(); + let w_l2 = w_l1.read().unwrap(); + if (w_l2.get_flags().unwrap() & BLOB_FLAG_IS_CODING) != 0 { + break; + } contq.push_back(window[k].clone().expect("clone in fn recv_window")); window[k] = None; *consumed += 1;