diff --git a/src/erasure.rs b/src/erasure.rs index 343c275a1c..f2b047e6c6 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -334,16 +334,19 @@ pub fn recover( // if we're not missing data, or if we have too much missin but have enough coding if data_missing == 0 || (data_missing + coding_missing) > NUM_CODING { - debug!( + trace!( "1: start: {} skipping recovery data: {} coding: {}", - block_start, data_missing, coding_missing + block_start, + data_missing, + coding_missing ); block_start += NUM_DATA; continue; } - debug!( + trace!( "2: recovering: data: {} coding: {}", - data_missing, coding_missing + data_missing, + coding_missing ); let mut blobs: Vec = Vec::with_capacity(NUM_DATA + NUM_CODING); let mut locks = Vec::with_capacity(NUM_DATA + NUM_CODING); @@ -359,6 +362,7 @@ pub fn recover( if meta.is_none() { let bl = window[j].data.clone().unwrap(); meta = Some(bl.read().unwrap().meta.clone()); + trace!("meta at {} {:?}", i, meta); } blobs.push( window[j] @@ -392,7 +396,7 @@ pub fn recover( window[j].coding = Some(n.clone()); //mark the missing memory blobs.push(n); - erasures.push((i - block_start + NUM_DATA) as i32); + erasures.push(((i - coding_start) + NUM_DATA) as i32); } } erasures.push(-1); @@ -406,6 +410,7 @@ pub fn recover( for b in &blobs { locks.push(b.write().expect("'locks' arr in pb fn recover")); } + { let mut coding_ptrs: Vec<&[u8]> = Vec::with_capacity(NUM_CODING); let mut data_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_DATA); @@ -427,7 +432,12 @@ pub fn recover( } for i in &erasures[..erasures.len() - 1] { let idx = *i as usize; - let data_size = locks[idx].get_data_size().unwrap() - BLOB_HEADER_SIZE as u64; + let mut data_size = locks[idx].get_data_size().unwrap(); + + trace!("data_size at {} {}", *i, data_size); + + data_size -= BLOB_HEADER_SIZE as u64; + locks[idx].meta = meta.clone().unwrap(); locks[idx].set_size(data_size as usize); trace!( @@ -519,7 +529,7 @@ mod test { print!("{:>w$} ", window_l2.data()[i], w = 2); } } else { - print!("data null"); + print!("data null "); } if w.coding.is_some() { let window_l1 = w.coding.clone().unwrap(); @@ -599,30 +609,71 @@ mod test { println!("** after-gen-coding:"); print_window(&window); + println!("** whack data block:"); + // test erasing a data block let erase_offset = offset; // Create a hole in the window let refwindow = window[erase_offset].data.clone(); window[erase_offset].data = None; + print_window(&window); // Recover it from coding assert!(erasure::recover(&blob_recycler, &mut window, offset, offset + num_blobs).is_ok()); println!("** after-recover:"); print_window(&window); - // Check the result - let window_l = window[erase_offset].data.clone().unwrap(); - let window_l2 = window_l.read().unwrap(); - let ref_l = refwindow.clone().unwrap(); - let ref_l2 = ref_l.read().unwrap(); - assert_eq!( - window_l2.data[..(data_len + BLOB_HEADER_SIZE)], - ref_l2.data[..(data_len + BLOB_HEADER_SIZE)] - ); - assert_eq!(window_l2.meta.size, ref_l2.meta.size); - assert_eq!(window_l2.meta.addr, ref_l2.meta.addr); - assert_eq!(window_l2.meta.port, ref_l2.meta.port); - assert_eq!(window_l2.meta.v6, ref_l2.meta.v6); - assert_eq!(window_l2.get_index().unwrap(), erase_offset as u64); + { + // Check the result, block is here to drop locks + + let window_l = window[erase_offset].data.clone().unwrap(); + let window_l2 = window_l.read().unwrap(); + let ref_l = refwindow.clone().unwrap(); + let ref_l2 = ref_l.read().unwrap(); + assert_eq!( + window_l2.data[..(data_len + BLOB_HEADER_SIZE)], + ref_l2.data[..(data_len + BLOB_HEADER_SIZE)] + ); + assert_eq!(window_l2.meta.size, ref_l2.meta.size); + assert_eq!(window_l2.meta.addr, ref_l2.meta.addr); + assert_eq!(window_l2.meta.port, ref_l2.meta.port); + assert_eq!(window_l2.meta.v6, ref_l2.meta.v6); + assert_eq!(window_l2.get_index().unwrap(), erase_offset as u64); + } + + println!("** whack coding block and data block"); + // test erasing a coding block + + let erase_offset = offset + erasure::NUM_DATA - erasure::NUM_CODING; + // Create a hole in the window + + blob_recycler.recycle(window[erase_offset].data.clone().unwrap()); + window[erase_offset].data = None; + + let refwindow = window[erase_offset].coding.clone(); + window[erase_offset].coding = None; + print_window(&window); + + // Recover it from coding + assert!(erasure::recover(&blob_recycler, &mut window, offset, offset + num_blobs).is_ok()); + println!("** after-recover:"); + print_window(&window); + + { + // Check the result, block is here to drop locks + let window_l = window[erase_offset].coding.clone().unwrap(); + let window_l2 = window_l.read().unwrap(); + let ref_l = refwindow.clone().unwrap(); + let ref_l2 = ref_l.read().unwrap(); + assert_eq!( + window_l2.data[..(data_len + BLOB_HEADER_SIZE)], + ref_l2.data[..(data_len + BLOB_HEADER_SIZE)] + ); + assert_eq!(window_l2.meta.size, ref_l2.meta.size); + assert_eq!(window_l2.meta.addr, ref_l2.meta.addr); + assert_eq!(window_l2.meta.port, ref_l2.meta.port); + assert_eq!(window_l2.meta.v6, ref_l2.meta.v6); + assert_eq!(window_l2.get_index().unwrap(), erase_offset as u64); + } } // //TODO This needs to be reworked diff --git a/src/packet.rs b/src/packet.rs index c7bee79c97..3604934324 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -25,7 +25,7 @@ pub const BLOB_DATA_SIZE: usize = BLOB_SIZE - BLOB_HEADER_SIZE; pub const PACKET_DATA_SIZE: usize = 256; pub const NUM_BLOBS: usize = (NUM_PACKETS * PACKET_DATA_SIZE) / BLOB_SIZE; -#[derive(Clone, Default)] +#[derive(Clone, Default, Debug)] #[repr(C)] pub struct Meta { pub size: usize, diff --git a/src/streamer.rs b/src/streamer.rs index fb38542e30..6449ef9e51 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -30,25 +30,6 @@ pub struct WindowSlot { pub coding: Option, } -//impl Copy for WindowSlot {} - -//impl Clone for WindowSlot { -// fn clone(&self) -> WindowSlot { -// WindowSlot { -// data: if self.data.is_some() { -// Some(self.data.clone()) -// } else { -// None -// }, -// coding: if self.coding.is_some() { -// Some(self.coding.clone()) -// } else { -// None -// }, -// } -// } -//} - pub type Window = Arc>>; #[derive(Debug, PartialEq, Eq)] @@ -217,18 +198,6 @@ fn repair_window( consumed: &mut u64, received: &mut u64, ) -> Result<()> { - #[cfg(feature = "erasure")] - { - if erasure::recover( - _recycler, - &mut locked_window.write().unwrap(), - *consumed as usize, - *received as usize, - ).is_err() - { - trace!("erasure::recover failed"); - } - } //exponential backoff if *last != *consumed { *times = 0; @@ -354,6 +323,7 @@ fn process_blob( debug_id: u64, recycler: &BlobRecycler, consumed: &mut u64, + received: u64, ) { let mut window = locked_window.write().unwrap(); @@ -361,13 +331,24 @@ fn process_blob( // of consumed to received and clear any old ones for ix in *consumed..(pix + 1) { let k = (ix % WINDOW_SIZE) as usize; - if let Some(b) = &mut window[k].data { - if b.read().unwrap().get_index().unwrap() >= *consumed as u64 { - continue; + + let mut old = false; + if let Some(b) = &window[k].data { + old = b.read().unwrap().get_index().unwrap() < *consumed as u64; + } + if old { + if let Some(b) = mem::replace(&mut window[k].data, None) { + recycler.recycle(b); } } - if let Some(b) = mem::replace(&mut window[k].data, None) { - recycler.recycle(b); + let mut old = false; + if let Some(b) = &window[k].coding { + old = b.read().unwrap().get_index().unwrap() < *consumed as u64; + } + if old { + if let Some(b) = mem::replace(&mut window[k].coding, None) { + recycler.recycle(b); + } } } @@ -384,9 +365,9 @@ fn process_blob( window[w].coding = Some(b); } else if let Some(blob) = &window[w].coding { if blob.read().unwrap().get_index().unwrap() != pix as u64 { - warn!("{:x}: overrun blob at index {:}", debug_id, w); + warn!("{:x}: overrun coding blob at index {:}", debug_id, w); } else { - debug!("{:x}: duplicate blob at index {:}", debug_id, w); + debug!("{:x}: duplicate coding blob at index {:}", debug_id, w); } } } else { @@ -394,13 +375,20 @@ fn process_blob( window[w].data = Some(b); } else if let Some(blob) = &window[w].data { if blob.read().unwrap().get_index().unwrap() != pix as u64 { - warn!("{:x}: overrun blob at index {:}", debug_id, w); + warn!("{:x}: overrun data blob at index {:}", debug_id, w); } else { - debug!("{:x}: duplicate blob at index {:}", debug_id, w); + debug!("{:x}: duplicate data blob at index {:}", debug_id, w); } } } + #[cfg(feature = "erasure")] + { + if erasure::recover(recycler, &mut window, *consumed as usize, received as usize).is_err() { + trace!("erasure::recover failed"); + } + } + // push all contiguous blobs into consumed queue, increment consumed loop { let k = (*consumed % WINDOW_SIZE) as usize; @@ -487,6 +475,7 @@ fn recv_window( debug_id, recycler, consumed, + *received, ); } print_window(debug_id, locked_window, *consumed); @@ -507,30 +496,28 @@ fn recv_window( } fn print_window(debug_id: u64, locked_window: &Window, consumed: u64) { - { - let buf: Vec<_> = locked_window - .read() - .unwrap() - .iter() - .enumerate() - .map(|(i, v)| { - if i == (consumed % WINDOW_SIZE) as usize { - "_" - } else if v.data.is_none() { - "0" - } else if let Some(ref cblob) = v.data { - if cblob.read().unwrap().is_coding() { - "C" - } else { - "1" - } - } else { - "0" - } - }) - .collect(); - trace!("{:x}:WINDOW ({}): {}", debug_id, consumed, buf.join("")); - } + let buf: Vec<_> = locked_window + .read() + .unwrap() + .iter() + .enumerate() + .map(|(i, v)| { + if i == (consumed % WINDOW_SIZE) as usize { + "_" + } else if v.data.is_none() && v.coding.is_none() { + "0" + } else if v.data.is_some() && v.coding.is_some() { + "X" + } else if v.data.is_some() { + // coding.is_none() + "D" + } else { + // data.is_none() + "C" + } + }) + .collect(); + trace!("{:x}:WINDOW ({}): {}", debug_id, consumed, buf.join("")); } pub fn default_window() -> Window { @@ -686,6 +673,15 @@ fn broadcast( ); recycler.recycle(x); } + if let Some(x) = mem::replace(&mut win[pos].coding, None) { + trace!( + "popped {} at {}", + x.read().unwrap().get_index().unwrap(), + pos + ); + recycler.recycle(x); + } + trace!("null {}", pos); } while let Some(b) = blobs.pop() {