protect generate and recover from u64->usize casting issues

This commit is contained in:
Rob Walker
2018-07-18 17:59:44 -07:00
parent 8d2bd43100
commit 6656ec816c
3 changed files with 146 additions and 148 deletions

View File

@@ -181,114 +181,113 @@ pub fn decode_blocks(
pub fn generate_coding( pub fn generate_coding(
window: &mut [WindowSlot], window: &mut [WindowSlot],
recycler: &BlobRecycler, recycler: &BlobRecycler,
consumed: usize, start_idx: usize,
num_blobs: usize, num_blobs: usize,
) -> Result<()> { ) -> Result<()> {
let mut block_start = consumed - (consumed % NUM_DATA); let mut block_start = start_idx - (start_idx % NUM_DATA);
for i in consumed..consumed + num_blobs { loop {
if (i % NUM_DATA) == (NUM_DATA - 1) { if (block_start + NUM_DATA) > (start_idx + num_blobs) {
info!( break;
"generate_coding start: {} end: {} consumed: {} num_blobs: {}",
block_start,
block_start + NUM_DATA,
consumed,
num_blobs
);
let mut data_blobs = Vec::with_capacity(NUM_DATA);
let mut max_data_size = 0;
for i in block_start..block_start + NUM_DATA {
let n = i % window.len();
trace!("window[{}] = {:?}", n, window[n].data);
if window[n].data.is_none() {
trace!("data block is null @ {}", n);
return Ok(());
}
let data = window[n].data.clone().unwrap();
max_data_size = cmp::max(data.read().unwrap().meta.size, max_data_size);
data_blobs.push(data);
}
trace!("max_data_size: {}", max_data_size);
let mut coding_blobs = Vec::with_capacity(NUM_CODING);
let coding_start = block_start + NUM_DATA - NUM_CODING;
let coding_end = block_start + NUM_DATA;
for i in coding_start..coding_end {
let n = i % window.len();
if window[n].coding.is_none() {
window[n].coding = Some(recycler.allocate());
}
let coding = window[n].coding.clone().unwrap();
let mut coding_wl = coding.write().unwrap();
{
// copy index and id from the data blob
let data = window[n].data.clone().unwrap();
let data_rl = data.read().unwrap();
coding_wl.set_index(data_rl.get_index().unwrap()).unwrap();
coding_wl.set_id(data_rl.get_id().unwrap()).unwrap();
}
coding_wl.set_size(max_data_size);
if coding_wl.set_coding().is_err() {
return Err(ErasureError::EncodeError);
}
trace!("coding {:?}", coding_wl.meta);
trace!("coding.data_size {}", coding_wl.get_data_size().unwrap());
coding_blobs.push(
window[n]
.coding
.clone()
.expect("'coding_blobs' arr in pub fn generate_coding"),
);
}
trace!("max_data_size {}", max_data_size);
let mut data_locks = Vec::with_capacity(NUM_DATA);
for b in &data_blobs {
data_locks.push(
b.write()
.expect("'data_locks' write lock in pub fn generate_coding"),
);
}
let mut data_ptrs: Vec<&[u8]> = Vec::with_capacity(NUM_DATA);
for (i, l) in data_locks.iter_mut().enumerate() {
trace!("i: {} data: {}", i, l.data[0]);
data_ptrs.push(&l.data[..max_data_size]);
}
let mut coding_locks = Vec::with_capacity(NUM_CODING);
for b in &coding_blobs {
coding_locks.push(
b.write()
.expect("'coding_locks' arr in pub fn generate_coding"),
);
}
let mut coding_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_CODING);
for (i, l) in coding_locks.iter_mut().enumerate() {
trace!("i: {} coding: {} size: {}", i, l.data[0], max_data_size);
coding_ptrs.push(&mut l.data_mut()[..max_data_size]);
}
generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?;
debug!(
"consumed: {} data: {}:{} coding: {}:{}",
consumed,
block_start,
block_start + NUM_DATA,
coding_start,
coding_end
);
block_start += NUM_DATA;
} }
info!(
"generate_coding start: {} end: {} start_idx: {} num_blobs: {}",
block_start,
block_start + NUM_DATA,
start_idx,
num_blobs
);
let mut data_blobs = Vec::with_capacity(NUM_DATA);
let mut max_data_size = 0;
for i in block_start..block_start + NUM_DATA {
let n = i % window.len();
trace!("window[{}] = {:?}", n, window[n].data);
if window[n].data.is_none() {
trace!("data block is null @ {}", n);
return Ok(());
}
let data = window[n].data.clone().unwrap();
max_data_size = cmp::max(data.read().unwrap().meta.size, max_data_size);
data_blobs.push(data);
}
trace!("max_data_size: {}", max_data_size);
let mut coding_blobs = Vec::with_capacity(NUM_CODING);
let coding_start = block_start + NUM_DATA - NUM_CODING;
let coding_end = block_start + NUM_DATA;
for i in coding_start..coding_end {
let n = i % window.len();
if window[n].coding.is_none() {
window[n].coding = Some(recycler.allocate());
}
let coding = window[n].coding.clone().unwrap();
let mut coding_wl = coding.write().unwrap();
{
// copy index and id from the data blob
let data = window[n].data.clone().unwrap();
let data_rl = data.read().unwrap();
coding_wl.set_index(data_rl.get_index().unwrap()).unwrap();
coding_wl.set_id(data_rl.get_id().unwrap()).unwrap();
}
coding_wl.set_size(max_data_size);
if coding_wl.set_coding().is_err() {
return Err(ErasureError::EncodeError);
}
coding_blobs.push(
window[n]
.coding
.clone()
.expect("'coding_blobs' arr in pub fn generate_coding"),
);
}
trace!("max_data_size {}", max_data_size);
let mut data_locks = Vec::with_capacity(NUM_DATA);
for b in &data_blobs {
data_locks.push(
b.write()
.expect("'data_locks' write lock in pub fn generate_coding"),
);
}
let mut data_ptrs: Vec<&[u8]> = Vec::with_capacity(NUM_DATA);
for (i, l) in data_locks.iter_mut().enumerate() {
trace!("i: {} data: {}", i, l.data[0]);
data_ptrs.push(&l.data[..max_data_size]);
}
let mut coding_locks = Vec::with_capacity(NUM_CODING);
for b in &coding_blobs {
coding_locks.push(
b.write()
.expect("'coding_locks' arr in pub fn generate_coding"),
);
}
let mut coding_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_CODING);
for (i, l) in coding_locks.iter_mut().enumerate() {
trace!("i: {} coding: {} size: {}", i, l.data[0], max_data_size);
coding_ptrs.push(&mut l.data_mut()[..max_data_size]);
}
generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?;
debug!(
"start_idx: {} data: {}:{} coding: {}:{}",
start_idx,
block_start,
block_start + NUM_DATA,
coding_start,
coding_end
);
block_start += NUM_DATA;
} }
Ok(()) Ok(())
} }
@@ -300,27 +299,20 @@ pub fn generate_coding(
pub fn recover( pub fn recover(
recycler: &BlobRecycler, recycler: &BlobRecycler,
window: &mut [WindowSlot], window: &mut [WindowSlot],
consumed: usize, start: usize,
received: usize, num_blobs: usize,
) -> Result<()> { ) -> Result<()> {
//recover with erasure coding let num_blocks = num_blobs / NUM_DATA;
if received <= consumed { let mut block_start = start - (start % NUM_DATA);
return Ok(());
}
let num_blocks = (received - consumed) / NUM_DATA;
let mut block_start = consumed - (consumed % NUM_DATA);
if num_blocks > 0 { if num_blocks > 0 {
debug!( debug!(
"num_blocks: {} received: {} consumed: {}", "num_blocks: {} start: {} num_blobs: {} block_start: {}",
num_blocks, received, consumed num_blocks, start, num_blobs, block_start
); );
} }
for i in 0..num_blocks { for _ in 0..num_blocks {
if i > 100 {
break;
}
let mut data_missing = 0; let mut data_missing = 0;
let mut coding_missing = 0; let mut coding_missing = 0;
let coding_start = block_start + NUM_DATA - NUM_CODING; let coding_start = block_start + NUM_DATA - NUM_CODING;
@@ -635,7 +627,7 @@ mod test {
print_window(&window); print_window(&window);
// Recover it from coding // Recover it from coding
assert!(erasure::recover(&blob_recycler, &mut window, offset, offset + num_blobs).is_ok()); assert!(erasure::recover(&blob_recycler, &mut window, offset, num_blobs).is_ok());
println!("** after-recover:"); println!("** after-recover:");
print_window(&window); print_window(&window);
@@ -658,26 +650,25 @@ mod test {
} }
println!("** whack coding block and data block"); println!("** whack coding block and data block");
// test erasing a coding block // tests erasing a coding block
let erase_offset = offset + erasure::NUM_DATA - erasure::NUM_CODING; let erase_offset = offset + erasure::NUM_DATA - erasure::NUM_CODING;
// Create a hole in the window // Create a hole in the window
let refwindow = window[erase_offset].data.clone();
blob_recycler.recycle(window[erase_offset].data.clone().unwrap());
window[erase_offset].data = None; window[erase_offset].data = None;
let refwindow = window[erase_offset].coding.clone(); blob_recycler.recycle(window[erase_offset].coding.clone().unwrap());
window[erase_offset].coding = None; window[erase_offset].coding = None;
print_window(&window); print_window(&window);
// Recover it from coding // Recover it from coding
assert!(erasure::recover(&blob_recycler, &mut window, offset, offset + num_blobs).is_ok()); assert!(erasure::recover(&blob_recycler, &mut window, offset, num_blobs).is_ok());
println!("** after-recover:"); println!("** after-recover:");
print_window(&window); print_window(&window);
{ {
// Check the result, block is here to drop locks // Check the result, block is here to drop locks
let window_l = window[erase_offset].coding.clone().unwrap(); let window_l = window[erase_offset].data.clone().unwrap();
let window_l2 = window_l.read().unwrap(); let window_l2 = window_l.read().unwrap();
let ref_l = refwindow.clone().unwrap(); let ref_l = refwindow.clone().unwrap();
let ref_l2 = ref_l.read().unwrap(); let ref_l2 = ref_l.read().unwrap();

View File

@@ -47,6 +47,7 @@ pub fn reconstruct_entries_from_blobs(blobs: VecDeque<SharedBlob>) -> bincode::R
for blob in blobs { for blob in blobs {
let entry = { let entry = {
let msg = blob.read().unwrap(); let msg = blob.read().unwrap();
deserialize(&msg.data()[..msg.meta.size]) deserialize(&msg.data()[..msg.meta.size])
}; };

View File

@@ -164,7 +164,7 @@ pub fn blob_receiver(
} }
fn find_next_missing( fn find_next_missing(
locked_window: &Window, window: &Window,
crdt: &Arc<RwLock<Crdt>>, crdt: &Arc<RwLock<Crdt>>,
consumed: &mut u64, consumed: &mut u64,
received: &mut u64, received: &mut u64,
@@ -172,7 +172,7 @@ fn find_next_missing(
if *received <= *consumed { if *received <= *consumed {
Err(WindowError::GenericError)?; Err(WindowError::GenericError)?;
} }
let window = locked_window.read().unwrap(); let window = window.read().unwrap();
let reqs: Vec<_> = (*consumed..*received) let reqs: Vec<_> = (*consumed..*received)
.filter_map(|pix| { .filter_map(|pix| {
let i = (pix % WINDOW_SIZE) as usize; let i = (pix % WINDOW_SIZE) as usize;
@@ -190,9 +190,8 @@ fn find_next_missing(
fn repair_window( fn repair_window(
debug_id: u64, debug_id: u64,
locked_window: &Window, window: &Window,
crdt: &Arc<RwLock<Crdt>>, crdt: &Arc<RwLock<Crdt>>,
_recycler: &BlobRecycler,
last: &mut u64, last: &mut u64,
times: &mut usize, times: &mut usize,
consumed: &mut u64, consumed: &mut u64,
@@ -215,7 +214,7 @@ fn repair_window(
return Ok(()); return Ok(());
} }
let reqs = find_next_missing(locked_window, crdt, consumed, received)?; let reqs = find_next_missing(window, crdt, consumed, received)?;
trace!("{:x}: repair_window missing: {}", debug_id, reqs.len()); trace!("{:x}: repair_window missing: {}", debug_id, reqs.len());
if !reqs.is_empty() { if !reqs.is_empty() {
inc_new_counter!("streamer-repair_window-repair", reqs.len()); inc_new_counter!("streamer-repair_window-repair", reqs.len());
@@ -308,7 +307,7 @@ fn retransmit_all_leader_blocks(
/// the entry height of this blob /// the entry height of this blob
/// * `w` - the index this blob would land at within the window /// * `w` - the index this blob would land at within the window
/// * `consume_queue` - output, blobs to be rebroadcast are placed here /// * `consume_queue` - output, blobs to be rebroadcast are placed here
/// * `locked_window` - the window we're operating on /// * `window` - the window we're operating on
/// * `debug_id` - this node's id in a useful-for-debug format /// * `debug_id` - this node's id in a useful-for-debug format
/// * `recycler` - where to return the blob once processed, also where /// * `recycler` - where to return the blob once processed, also where
/// to return old blobs from the window /// to return old blobs from the window
@@ -319,13 +318,13 @@ fn process_blob(
pix: u64, pix: u64,
w: usize, w: usize,
consume_queue: &mut SharedBlobs, consume_queue: &mut SharedBlobs,
locked_window: &Window, window: &Window,
debug_id: u64, debug_id: u64,
recycler: &BlobRecycler, recycler: &BlobRecycler,
consumed: &mut u64, consumed: &mut u64,
received: u64, received: u64,
) { ) {
let mut window = locked_window.write().unwrap(); let mut window = window.write().unwrap();
// Search the window for old blobs in the window // Search the window for old blobs in the window
// of consumed to received and clear any old ones // of consumed to received and clear any old ones
@@ -334,7 +333,7 @@ fn process_blob(
let mut old = false; let mut old = false;
if let Some(b) = &window[k].data { if let Some(b) = &window[k].data {
old = b.read().unwrap().get_index().unwrap() < *consumed as u64; old = b.read().unwrap().get_index().unwrap() < *consumed;
} }
if old { if old {
if let Some(b) = mem::replace(&mut window[k].data, None) { if let Some(b) = mem::replace(&mut window[k].data, None) {
@@ -343,7 +342,7 @@ fn process_blob(
} }
let mut old = false; let mut old = false;
if let Some(b) = &window[k].coding { if let Some(b) = &window[k].coding {
old = b.read().unwrap().get_index().unwrap() < *consumed as u64; old = b.read().unwrap().get_index().unwrap() < *consumed;
} }
if old { if old {
if let Some(b) = mem::replace(&mut window[k].coding, None) { if let Some(b) = mem::replace(&mut window[k].coding, None) {
@@ -384,7 +383,13 @@ fn process_blob(
#[cfg(feature = "erasure")] #[cfg(feature = "erasure")]
{ {
if erasure::recover(recycler, &mut window, *consumed as usize, received as usize).is_err() { if erasure::recover(
recycler,
&mut window,
(*consumed % WINDOW_SIZE) as usize,
(received - *consumed) as usize,
).is_err()
{
trace!("erasure::recover failed"); trace!("erasure::recover failed");
} }
} }
@@ -397,14 +402,20 @@ fn process_blob(
if window[k].data.is_none() { if window[k].data.is_none() {
break; break;
} }
if let Some(blob) = &window[w].data {
assert!(blob.read().unwrap().meta.size < BLOB_SIZE);
}
consume_queue.push_back(window[k].data.clone().expect("clone in fn recv_window")); consume_queue.push_back(window[k].data.clone().expect("clone in fn recv_window"));
*consumed += 1; *consumed += 1;
if *consumed % WINDOW_SIZE == 0 {
eprintln!("window wrapped, consumed {}", *consumed);
}
} }
} }
fn recv_window( fn recv_window(
debug_id: u64, debug_id: u64,
locked_window: &Window, window: &Window,
crdt: &Arc<RwLock<Crdt>>, crdt: &Arc<RwLock<Crdt>>,
recycler: &BlobRecycler, recycler: &BlobRecycler,
consumed: &mut u64, consumed: &mut u64,
@@ -471,14 +482,14 @@ fn recv_window(
pix, pix,
w, w,
&mut consume_queue, &mut consume_queue,
locked_window, window,
debug_id, debug_id,
recycler, recycler,
consumed, consumed,
*received, *received,
); );
} }
print_window(debug_id, locked_window, *consumed); print_window(debug_id, window, *consumed);
trace!("sending consume_queue.len: {}", consume_queue.len()); trace!("sending consume_queue.len: {}", consume_queue.len());
if !consume_queue.is_empty() { if !consume_queue.is_empty() {
debug!( debug!(
@@ -495,8 +506,8 @@ fn recv_window(
Ok(()) Ok(())
} }
fn print_window(debug_id: u64, locked_window: &Window, consumed: u64) { fn print_window(debug_id: u64, window: &Window, consumed: u64) {
let buf: Vec<_> = locked_window let buf: Vec<_> = window
.read() .read()
.unwrap() .unwrap()
.iter() .iter()
@@ -609,7 +620,6 @@ pub fn window(
debug_id, debug_id,
&window, &window,
&crdt, &crdt,
&recycler,
&mut last, &mut last,
&mut times, &mut times,
&mut consumed, &mut consumed,
@@ -648,10 +658,6 @@ fn broadcast(
print_window(me.debug_id(), window, *receive_index); print_window(me.debug_id(), window, *receive_index);
for mut blobs in blobs_chunked { for mut blobs in blobs_chunked {
// Insert the coding blobs into the blob stream
// #[cfg(feature = "erasure")]
// erasure::add_coding_blobs(recycler, &mut blobs, *receive_index);
let blobs_len = blobs.len(); let blobs_len = blobs.len();
debug!("{:x} broadcast blobs.len: {}", debug_id, blobs_len); debug!("{:x} broadcast blobs.len: {}", debug_id, blobs_len);
@@ -699,7 +705,7 @@ fn broadcast(
erasure::generate_coding( erasure::generate_coding(
&mut window.write().unwrap(), &mut window.write().unwrap(),
recycler, recycler,
*receive_index as usize, (*receive_index % WINDOW_SIZE) as usize,
blobs_len, blobs_len,
)?; )?;
} }