This commit is contained in:
Rob Walker
2018-07-24 11:31:03 -07:00
parent 940caf7876
commit e1fc7444f9
2 changed files with 71 additions and 68 deletions

View File

@@ -295,39 +295,33 @@ pub fn generate_coding(
coding_blobs.push(coding.clone()); coding_blobs.push(coding.clone());
} }
let mut data_locks = Vec::with_capacity(NUM_DATA); let data_locks: Vec<_> = data_blobs
for b in &data_blobs { .iter()
data_locks.push( .map(|b| b.read().expect("'data_locks' of data_blobs"))
b.write() .collect();
.expect("'data_locks' write lock in pub fn generate_coding"),
);
}
let mut data_ptrs: Vec<&[u8]> = Vec::with_capacity(NUM_DATA); let data_ptrs: Vec<_> = data_locks
for (i, l) in data_locks.iter_mut().enumerate() { .iter()
trace!("{:x} i: {} data: {}", debug_id, i, l.data[0]); .enumerate()
data_ptrs.push(&l.data[..max_data_size]); .map(|(i, l)| {
} trace!("{:x} i: {} data: {}", debug_id, i, l.data[0]);
&l.data[..max_data_size]
})
.collect();
let mut coding_locks = Vec::with_capacity(NUM_CODING); let mut coding_locks: Vec<_> = coding_blobs
for b in &coding_blobs { .iter()
coding_locks.push( .map(|b| b.write().expect("'coding_locks' of coding_blobs"))
b.write() .collect();
.expect("'coding_locks' arr in pub fn generate_coding"),
);
}
let mut coding_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_CODING); let mut coding_ptrs: Vec<_> = coding_locks
for (i, l) in coding_locks.iter_mut().enumerate() { .iter_mut()
trace!( .enumerate()
"{:x} i: {} coding: {} size: {}", .map(|(i, l)| {
debug_id, trace!("{:x} i: {} coding: {}", debug_id, i, l.data[0],);
i, &mut l.data_mut()[..max_data_size]
l.data[0], })
max_data_size .collect();
);
coding_ptrs.push(&mut l.data_mut()[..max_data_size]);
}
generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?; generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?;
debug!( debug!(
@@ -339,6 +333,43 @@ pub fn generate_coding(
Ok(()) Ok(())
} }
// examine the window slot at idx returns
// true if slot is empty
// true if slot is stale (i.e. has the wrong index), old blob is flushed
// false if slot has a blob with the right index
fn is_missing(
debug_id: u64,
idx: u64,
window_slot: &mut Option<SharedBlob>,
recycler: &BlobRecycler,
c_or_d: &str,
) -> bool {
if let Some(blob) = mem::replace(window_slot, None) {
let blob_idx = blob.read().unwrap().get_index().unwrap();
if blob_idx == idx {
trace!("recover {:x}: idx: {} good {}", debug_id, idx, c_or_d);
// put it back
mem::replace(window_slot, Some(blob));
false
} else {
trace!(
"recover {:x}: idx: {} old {} {}, recycling",
debug_id,
idx,
c_or_d,
blob_idx,
);
// recycle it
recycler.recycle(blob);
true
}
} else {
trace!("recover {:x}: idx: {} None {}", debug_id, idx, c_or_d);
// nothing there
true
}
}
// examine the window beginning at block_start for missing or // examine the window beginning at block_start for missing or
// stale (based on block_start_idx) blobs // stale (based on block_start_idx) blobs
// if a blob is stale, remove it from the window slot // if a blob is stale, remove it from the window slot
@@ -360,36 +391,6 @@ fn find_missing(
let idx = (i - block_start) as u64 + block_start_idx; let idx = (i - block_start) as u64 + block_start_idx;
let n = i % window.len(); let n = i % window.len();
fn is_missing(
debug_id: u64,
idx: u64,
window_slot: &mut Option<SharedBlob>,
recycler: &BlobRecycler,
c_or_d: &str,
) -> bool {
if let Some(blob) = mem::replace(window_slot, None) {
let blob_idx = blob.read().unwrap().get_index().unwrap();
if blob_idx == idx {
trace!("recover {:x}: idx: {} good {}", debug_id, idx, c_or_d);
mem::replace(window_slot, Some(blob));
false
} else {
trace!(
"recover {:x}: idx: {} old {} {}, recycling",
debug_id,
idx,
c_or_d,
blob_idx,
);
recycler.recycle(blob);
true
}
} else {
trace!("recover {:x}: idx: {} None {}", debug_id, idx, c_or_d);
true
}
}
if is_missing(debug_id, idx, &mut window[n].data, recycler, "data") { if is_missing(debug_id, idx, &mut window[n].data, recycler, "data") {
data_missing += 1; data_missing += 1;
} }

View File

@@ -481,7 +481,7 @@ fn recv_window(
consumed, consumed,
); );
} }
print_window(debug_id, window, *consumed); trace!("{}", print_window(debug_id, window, *consumed));
trace!( trace!(
"{:x}: sending consume_queue.len: {}", "{:x}: sending consume_queue.len: {}",
debug_id, debug_id,
@@ -506,7 +506,7 @@ fn recv_window(
Ok(()) Ok(())
} }
fn print_window(debug_id: u64, window: &Window, consumed: u64) { fn print_window(debug_id: u64, window: &Window, consumed: u64) -> String {
let pointer: Vec<_> = window let pointer: Vec<_> = window
.read() .read()
.unwrap() .unwrap()
@@ -539,13 +539,15 @@ fn print_window(debug_id: u64, window: &Window, consumed: u64) {
} }
}) })
.collect(); .collect();
trace!( format!(
"{:x}: WINDOW ({}): {}", "\n{:x}: WINDOW ({}): {}\n{:x}: WINDOW ({}): {}",
debug_id, debug_id,
consumed, consumed,
pointer.join("") pointer.join(""),
); debug_id,
trace!("{:x}: WINDOW ({}): {}", debug_id, consumed, buf.join("")); consumed,
buf.join("")
)
} }
pub fn default_window() -> Window { pub fn default_window() -> Window {
@@ -686,7 +688,7 @@ fn broadcast(
// break them up into window-sized chunks to process // break them up into window-sized chunks to process
let blobs_chunked = blobs_vec.chunks(WINDOW_SIZE as usize).map(|x| x.to_vec()); let blobs_chunked = blobs_vec.chunks(WINDOW_SIZE as usize).map(|x| x.to_vec());
print_window(debug_id, window, *receive_index); trace!("{}", print_window(debug_id, window, *receive_index));
for mut blobs in blobs_chunked { for mut blobs in blobs_chunked {
let blobs_len = blobs.len(); let blobs_len = blobs.len();