diff --git a/src/erasure.rs b/src/erasure.rs index 8eea39ca55..5e0f9a4650 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -295,39 +295,33 @@ pub fn generate_coding( coding_blobs.push(coding.clone()); } - let mut data_locks = Vec::with_capacity(NUM_DATA); - for b in &data_blobs { - data_locks.push( - b.write() - .expect("'data_locks' write lock in pub fn generate_coding"), - ); - } + let data_locks: Vec<_> = data_blobs + .iter() + .map(|b| b.read().expect("'data_locks' of data_blobs")) + .collect(); - let mut data_ptrs: Vec<&[u8]> = Vec::with_capacity(NUM_DATA); - for (i, l) in data_locks.iter_mut().enumerate() { - trace!("{:x} i: {} data: {}", debug_id, i, l.data[0]); - data_ptrs.push(&l.data[..max_data_size]); - } + let data_ptrs: Vec<_> = data_locks + .iter() + .enumerate() + .map(|(i, l)| { + trace!("{:x} i: {} data: {}", debug_id, i, l.data[0]); + &l.data[..max_data_size] + }) + .collect(); - let mut coding_locks = Vec::with_capacity(NUM_CODING); - for b in &coding_blobs { - coding_locks.push( - b.write() - .expect("'coding_locks' arr in pub fn generate_coding"), - ); - } + let mut coding_locks: Vec<_> = coding_blobs + .iter() + .map(|b| b.write().expect("'coding_locks' of coding_blobs")) + .collect(); - let mut coding_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_CODING); - for (i, l) in coding_locks.iter_mut().enumerate() { - trace!( - "{:x} i: {} coding: {} size: {}", - debug_id, - i, - l.data[0], - max_data_size - ); - coding_ptrs.push(&mut l.data_mut()[..max_data_size]); - } + let mut coding_ptrs: Vec<_> = coding_locks + .iter_mut() + .enumerate() + .map(|(i, l)| { + trace!("{:x} i: {} coding: {}", debug_id, i, l.data[0],); + &mut l.data_mut()[..max_data_size] + }) + .collect(); generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?; debug!( @@ -339,6 +333,43 @@ pub fn generate_coding( Ok(()) } +// examine the window slot at idx returns +// true if slot is empty +// true if slot is stale (i.e. has the wrong index), old blob is flushed +// false if slot has a blob with the right index +fn is_missing( + debug_id: u64, + idx: u64, + window_slot: &mut Option, + recycler: &BlobRecycler, + c_or_d: &str, +) -> bool { + if let Some(blob) = mem::replace(window_slot, None) { + let blob_idx = blob.read().unwrap().get_index().unwrap(); + if blob_idx == idx { + trace!("recover {:x}: idx: {} good {}", debug_id, idx, c_or_d); + // put it back + mem::replace(window_slot, Some(blob)); + false + } else { + trace!( + "recover {:x}: idx: {} old {} {}, recycling", + debug_id, + idx, + c_or_d, + blob_idx, + ); + // recycle it + recycler.recycle(blob); + true + } + } else { + trace!("recover {:x}: idx: {} None {}", debug_id, idx, c_or_d); + // nothing there + true + } +} + // examine the window beginning at block_start for missing or // stale (based on block_start_idx) blobs // if a blob is stale, remove it from the window slot @@ -360,36 +391,6 @@ fn find_missing( let idx = (i - block_start) as u64 + block_start_idx; let n = i % window.len(); - fn is_missing( - debug_id: u64, - idx: u64, - window_slot: &mut Option, - recycler: &BlobRecycler, - c_or_d: &str, - ) -> bool { - if let Some(blob) = mem::replace(window_slot, None) { - let blob_idx = blob.read().unwrap().get_index().unwrap(); - if blob_idx == idx { - trace!("recover {:x}: idx: {} good {}", debug_id, idx, c_or_d); - mem::replace(window_slot, Some(blob)); - false - } else { - trace!( - "recover {:x}: idx: {} old {} {}, recycling", - debug_id, - idx, - c_or_d, - blob_idx, - ); - recycler.recycle(blob); - true - } - } else { - trace!("recover {:x}: idx: {} None {}", debug_id, idx, c_or_d); - true - } - } - if is_missing(debug_id, idx, &mut window[n].data, recycler, "data") { data_missing += 1; } diff --git a/src/streamer.rs b/src/streamer.rs index 08c9d40718..b8ad3be0ef 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -481,7 +481,7 @@ fn recv_window( consumed, ); } - print_window(debug_id, window, *consumed); + trace!("{}", print_window(debug_id, window, *consumed)); trace!( "{:x}: sending consume_queue.len: {}", debug_id, @@ -506,7 +506,7 @@ fn recv_window( Ok(()) } -fn print_window(debug_id: u64, window: &Window, consumed: u64) { +fn print_window(debug_id: u64, window: &Window, consumed: u64) -> String { let pointer: Vec<_> = window .read() .unwrap() @@ -539,13 +539,15 @@ fn print_window(debug_id: u64, window: &Window, consumed: u64) { } }) .collect(); - trace!( - "{:x}: WINDOW ({}): {}", + format!( + "\n{:x}: WINDOW ({}): {}\n{:x}: WINDOW ({}): {}", debug_id, consumed, - pointer.join("") - ); - trace!("{:x}: WINDOW ({}): {}", debug_id, consumed, buf.join("")); + pointer.join(""), + debug_id, + consumed, + buf.join("") + ) } pub fn default_window() -> Window { @@ -686,7 +688,7 @@ fn broadcast( // break them up into window-sized chunks to process let blobs_chunked = blobs_vec.chunks(WINDOW_SIZE as usize).map(|x| x.to_vec()); - print_window(debug_id, window, *receive_index); + trace!("{}", print_window(debug_id, window, *receive_index)); for mut blobs in blobs_chunked { let blobs_len = blobs.len();