diff --git a/src/erasure.rs b/src/erasure.rs index 0adb7d7ba5..7c74c72c4a 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -178,80 +178,88 @@ pub fn add_coding_blobs(recycler: &BlobRecycler, blobs: &mut Vec, co } // Generate coding blocks in window starting from consumed -pub fn generate_coding(window: &mut Vec>, consumed: usize) -> Result<()> { - let mut data_blobs = Vec::new(); - let mut coding_blobs = Vec::new(); - let mut data_locks = Vec::new(); - let mut data_ptrs: Vec<&[u8]> = Vec::new(); - let mut coding_locks = Vec::new(); - let mut coding_ptrs: Vec<&mut [u8]> = Vec::new(); +pub fn generate_coding(window: &mut Vec>, consumed: usize, num_blobs: usize) -> Result<()> { - let block_start = consumed - (consumed % NUM_CODED); - info!( - "generate_coding start: {} end: {}", - block_start, - block_start + NUM_DATA - ); - for i in block_start..block_start + NUM_DATA { - let n = i % window.len(); - trace!("window[{}] = {:?}", n, window[n]); - if window[n].is_none() { - trace!("data block is null @ {}", n); - return Ok(()); - } - data_blobs.push( - window[n] - .clone() - .expect("'data_blobs' arr in pub fn generate_coding"), - ); - } - let mut max_data_size = 0; - for b in &data_blobs { - let lck = b.write().expect("'b' write lock in pub fn generate_coding"); - if lck.meta.size > max_data_size { - max_data_size = lck.meta.size; - } - data_locks.push(lck); - } - for (i, l) in data_locks.iter_mut().enumerate() { - trace!("i: {} data: {}", i, l.data[0]); - data_ptrs.push(&l.data()[..max_data_size]); - } + let mut block_start = consumed - (consumed % NUM_CODED); - // generate coding ptr array - let coding_start = block_start + NUM_DATA; - let coding_end = block_start + NUM_CODED; - for i in coding_start..coding_end { - let n = i % window.len(); - if window[n].is_none() { - trace!("coding block is null @ {}", n); - return Ok(()); - } - let w_l = window[n].clone().unwrap(); - w_l.write().unwrap().meta.size = max_data_size; - let flags = w_l.write().unwrap().get_flags().unwrap(); - if w_l.write().unwrap().set_flags(flags | BLOB_FLAG_IS_CODING).is_err() { - return Err(ErasureError::EncodeError); - } - coding_blobs.push( - window[n] - .clone() - .expect("'coding_blobs' arr in pub fn generate_coding"), - ); - } - for b in &coding_blobs { - coding_locks.push( - b.write() - .expect("'coding_locks' arr in pub fn generate_coding"), - ); - } - for (i, l) in coding_locks.iter_mut().enumerate() { - trace!("i: {} coding: {}", i, l.data[0]); - coding_ptrs.push(&mut l.data_mut()[..max_data_size]); - } + let num_blocks = num_blobs / NUM_CODED; - generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?; - trace!("consumed: {}", consumed); + for _ in 0..num_blocks { + + let mut data_blobs = Vec::new(); + let mut coding_blobs = Vec::new(); + let mut data_locks = Vec::new(); + let mut data_ptrs: Vec<&[u8]> = Vec::new(); + let mut coding_locks = Vec::new(); + let mut coding_ptrs: Vec<&mut [u8]> = Vec::new(); + + info!( + "generate_coding start: {} end: {}", + block_start, + block_start + NUM_DATA + ); + for i in block_start..block_start + NUM_DATA { + let n = i % window.len(); + trace!("window[{}] = {:?}", n, window[n]); + if window[n].is_none() { + trace!("data block is null @ {}", n); + return Ok(()); + } + data_blobs.push( + window[n] + .clone() + .expect("'data_blobs' arr in pub fn generate_coding"), + ); + } + let mut max_data_size = 0; + for b in &data_blobs { + let lck = b.write().expect("'b' write lock in pub fn generate_coding"); + if lck.meta.size > max_data_size { + max_data_size = lck.meta.size; + } + data_locks.push(lck); + } + for (i, l) in data_locks.iter_mut().enumerate() { + trace!("i: {} data: {}", i, l.data[0]); + data_ptrs.push(&l.data()[..max_data_size]); + } + + // generate coding ptr array + let coding_start = block_start + NUM_DATA; + let coding_end = block_start + NUM_CODED; + for i in coding_start..coding_end { + let n = i % window.len(); + if window[n].is_none() { + trace!("coding block is null @ {}", n); + return Ok(()); + } + let w_l = window[n].clone().unwrap(); + w_l.write().unwrap().meta.size = max_data_size; + let flags = w_l.write().unwrap().get_flags().unwrap(); + if w_l.write().unwrap().set_flags(flags | BLOB_FLAG_IS_CODING).is_err() { + return Err(ErasureError::EncodeError); + } + coding_blobs.push( + window[n] + .clone() + .expect("'coding_blobs' arr in pub fn generate_coding"), + ); + } + for b in &coding_blobs { + coding_locks.push( + b.write() + .expect("'coding_locks' arr in pub fn generate_coding"), + ); + } + for (i, l) in coding_locks.iter_mut().enumerate() { + trace!("i: {} coding: {}", i, l.data[0]); + coding_ptrs.push(&mut l.data_mut()[..max_data_size]); + } + + generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?; + debug!("consumed: {}", consumed); + block_start += NUM_CODED; + } Ok(()) } @@ -416,10 +424,11 @@ mod test { data_len: usize, blob_recycler: &BlobRecycler, offset: usize, + num_blobs: usize, ) -> Vec> { let mut window = vec![None; 16]; let mut blobs = Vec::new(); - for i in 0..erasure::NUM_DATA + 2 { + for i in 0..num_blobs { let b = blob_recycler.allocate(); let b_ = b.clone(); let mut w = b.write().unwrap(); @@ -441,7 +450,7 @@ mod test { let crdt = Arc::new(RwLock::new(crdt::Crdt::new(d.clone()))); assert!(crdt::Crdt::index_blobs(&crdt, &blobs, &mut (offset as u64)).is_ok()); - for (i, b) in blobs.into_iter().enumerate() { + for b in blobs { let idx = b.read().unwrap().get_index().unwrap() as usize; window[idx] = Some(b); } @@ -456,12 +465,13 @@ mod test { // Generate a window let offset = 1; - let mut window = generate_window(data_len, &blob_recycler, 0); + let num_blobs = erasure::NUM_DATA + 2; + let mut window = generate_window(data_len, &blob_recycler, 0, num_blobs); println!("** after-gen-window:"); print_window(&window); // Generate the coding blocks - assert!(erasure::generate_coding(&mut window, offset).is_ok()); + assert!(erasure::generate_coding(&mut window, offset, num_blobs).is_ok()); println!("** after-gen-coding:"); print_window(&window); @@ -494,10 +504,11 @@ mod test { let blob_recycler = BlobRecycler::default(); let offset = 4; let data_len = 16; - let mut window = generate_window(data_len, &blob_recycler, offset); + let num_blobs = erasure::NUM_DATA + 2; + let mut window = generate_window(data_len, &blob_recycler, offset, num_blobs); println!("** after-gen:"); print_window(&window); - assert!(erasure::generate_coding(&mut window, offset).is_ok()); + assert!(erasure::generate_coding(&mut window, offset, num_blobs).is_ok()); println!("** after-coding:"); print_window(&window); let refwindow = window[offset + 1].clone(); diff --git a/src/streamer.rs b/src/streamer.rs index 0ca850ea5e..bdec788cdf 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -305,6 +305,19 @@ fn recv_window( } } } + print_window(locked_window, *consumed); + trace!("sending contq.len: {}", contq.len()); + if !contq.is_empty() { + trace!("sending contq.len: {}", contq.len()); + s.send(contq)?; + } + Ok(()) +} + +fn print_window( + locked_window: &Arc>>>, + consumed: usize, + ) { { let buf: Vec<_> = locked_window .read() @@ -312,8 +325,7 @@ fn recv_window( .iter() .enumerate() .map(|(i, v)| { - if i == (*consumed % WINDOW_SIZE) { - assert!(v.is_none()); + if i == (consumed % WINDOW_SIZE) { "_" } else if v.is_none() { "0" @@ -322,14 +334,8 @@ fn recv_window( } }) .collect(); - trace!("WINDOW: {}", buf.join("")); + info!("WINDOW ({}): {}", consumed, buf.join("")); } - trace!("sending contq.len: {}", contq.len()); - if !contq.is_empty() { - trace!("sending contq.len: {}", contq.len()); - s.send(contq)?; - } - Ok(()) } pub fn default_window() -> Arc>>> { @@ -393,7 +399,11 @@ fn broadcast( while let Ok(mut nq) = r.try_recv() { dq.append(&mut nq); } - let mut blobs = dq.into_iter().collect(); + let mut blobs: Vec<_> = dq.into_iter().collect(); + + let blobs_len = blobs.len(); + info!("broadcast blobs.len: {}", blobs_len); + print_window(window, *transmit_index as usize); // Insert the coding blobs into the blob stream #[cfg(feature = "erasure")] @@ -431,7 +441,7 @@ fn broadcast( // Fill in the coding blob data from the window data blobs #[cfg(feature = "erasure")] { - if erasure::generate_coding(&mut window.write().unwrap(), *transmit_index as usize).is_err() + if erasure::generate_coding(&mut window.write().unwrap(), *transmit_index as usize, blobs_len).is_err() { return Err(Error::GenericError); }