Restore more of the blob window and add is_coding helper

This commit is contained in:
Stephen Akridge 2018-06-01 12:49:47 -07:00 committed by Greg Fitzgerald
parent 5711fb9969
commit b845245614
3 changed files with 107 additions and 74 deletions

View File

@ -1,6 +1,6 @@
// Support erasure coding // Support erasure coding
use packet::{BlobRecycler, SharedBlob, BLOB_FLAG_IS_CODING}; use packet::{BlobRecycler, SharedBlob};
use std::result; use std::result;
//TODO(sakridge) pick these values //TODO(sakridge) pick these values
@ -235,8 +235,7 @@ pub fn generate_coding(window: &mut Vec<Option<SharedBlob>>, consumed: usize, nu
} }
let w_l = window[n].clone().unwrap(); let w_l = window[n].clone().unwrap();
w_l.write().unwrap().meta.size = max_data_size; w_l.write().unwrap().meta.size = max_data_size;
let flags = w_l.write().unwrap().get_flags().unwrap(); if w_l.write().unwrap().set_coding().is_err() {
if w_l.write().unwrap().set_flags(flags | BLOB_FLAG_IS_CODING).is_err() {
return Err(ErasureError::EncodeError); return Err(ErasureError::EncodeError);
} }
coding_blobs.push( coding_blobs.push(
@ -271,74 +270,90 @@ pub fn recover(
re: &BlobRecycler, re: &BlobRecycler,
window: &mut Vec<Option<SharedBlob>>, window: &mut Vec<Option<SharedBlob>>,
consumed: usize, consumed: usize,
received: usize,
) -> Result<()> { ) -> Result<()> {
//recover with erasure coding //recover with erasure coding
let mut data_missing = 0; if received <= consumed {
let mut coded_missing = 0; return Ok(());
let block_start = consumed - (consumed % NUM_CODED);
let coding_start = block_start + NUM_DATA;
let coding_end = block_start + NUM_CODED;
/*info!(
"recover: block_start: {} coding_start: {} coding_end: {}",
block_start,
coding_start,
coding_end
);*/
for i in block_start..coding_end {
let n = i % window.len();
if window[n].is_none() {
if i >= coding_start {
coded_missing += 1;
} else {
data_missing += 1;
}
}
} }
if data_missing > 0 { let num_blocks = (received - consumed) / NUM_CODED;
if (data_missing + coded_missing) <= MAX_MISSING { let mut block_start = consumed - (consumed % NUM_CODED);
trace!("recovering: data: {} coding: {}", data_missing, coded_missing);
let mut blobs: Vec<SharedBlob> = Vec::new(); if num_blocks > 0 {
let mut locks = Vec::new(); debug!("num_blocks: {} received: {} consumed: {}", num_blocks, received, consumed);
let mut data_ptrs: Vec<&mut [u8]> = Vec::new(); }
let mut coding_ptrs: Vec<&[u8]> = Vec::new();
let mut erasures: Vec<i32> = Vec::new(); for i in 0..num_blocks {
for i in block_start..coding_end { if i > 100 {
let j = i % window.len(); break;
let mut b = &mut window[j];
if b.is_some() {
blobs.push(b.clone().expect("'blobs' arr in pb fn recover"));
continue;
}
let n = re.allocate();
*b = Some(n.clone());
//mark the missing memory
blobs.push(n);
erasures.push(i as i32);
}
erasures.push(-1);
trace!("erasures: {:?}", erasures);
//lock everything
for b in &blobs {
locks.push(b.write().expect("'locks' arr in pb fn recover"));
}
for (i, l) in locks.iter_mut().enumerate() {
if i >= NUM_DATA {
trace!("pushing coding: {}", i);
coding_ptrs.push(&l.data);
} else {
trace!("pushing data: {}", i);
data_ptrs.push(&mut l.data);
}
}
trace!(
"coding_ptrs.len: {} data_ptrs.len {}",
coding_ptrs.len(),
data_ptrs.len()
);
decode_blocks(data_ptrs.as_mut_slice(), &coding_ptrs, &erasures)?;
} else {
return Err(ErasureError::NotEnoughBlocksToDecode);
} }
let mut data_missing = 0;
let mut coded_missing = 0;
let coding_start = block_start + NUM_DATA;
let coding_end = block_start + NUM_CODED;
trace!(
"recover: block_start: {} coding_start: {} coding_end: {}",
block_start,
coding_start,
coding_end
);
for i in block_start..coding_end {
let n = i % window.len();
if window[n].is_none() {
if i >= coding_start {
coded_missing += 1;
} else {
data_missing += 1;
}
}
}
if (data_missing + coded_missing) != NUM_CODED {
trace!("recovering: data: {} coding: {}", data_missing, coded_missing);
}
if data_missing > 0 {
if (data_missing + coded_missing) <= MAX_MISSING {
let mut blobs: Vec<SharedBlob> = Vec::new();
let mut locks = Vec::new();
let mut data_ptrs: Vec<&mut [u8]> = Vec::new();
let mut coding_ptrs: Vec<&[u8]> = Vec::new();
let mut erasures: Vec<i32> = Vec::new();
for i in block_start..coding_end {
let j = i % window.len();
let mut b = &mut window[j];
if b.is_some() {
blobs.push(b.clone().expect("'blobs' arr in pb fn recover"));
continue;
}
let n = re.allocate();
*b = Some(n.clone());
//mark the missing memory
blobs.push(n);
erasures.push((i - block_start) as i32);
}
erasures.push(-1);
trace!("erasures: {:?}", erasures);
//lock everything
for b in &blobs {
locks.push(b.write().expect("'locks' arr in pb fn recover"));
}
for (i, l) in locks.iter_mut().enumerate() {
if i >= NUM_DATA {
trace!("pushing coding: {}", i);
coding_ptrs.push(&l.data);
} else {
trace!("pushing data: {}", i);
data_ptrs.push(&mut l.data);
}
}
trace!(
"coding_ptrs.len: {} data_ptrs.len {}",
coding_ptrs.len(),
data_ptrs.len()
);
decode_blocks(data_ptrs.as_mut_slice(), &coding_ptrs, &erasures)?;
}
}
block_start += NUM_CODED;
} }
Ok(()) Ok(())
} }
@ -446,6 +461,7 @@ mod test {
"127.0.0.1:1235".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(), "127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(), "127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(),
); );
let crdt = Arc::new(RwLock::new(crdt::Crdt::new(d.clone()))); let crdt = Arc::new(RwLock::new(crdt::Crdt::new(d.clone())));
@ -481,7 +497,7 @@ mod test {
window[erase_offset] = None; window[erase_offset] = None;
// Recover it from coding // Recover it from coding
assert!(erasure::recover(&blob_recycler, &mut window, offset).is_ok()); assert!(erasure::recover(&blob_recycler, &mut window, offset, offset + num_blobs).is_ok());
println!("** after-recover:"); println!("** after-recover:");
print_window(&window); print_window(&window);
@ -522,7 +538,7 @@ mod test {
window_l0.write().unwrap().data[0] = 55; window_l0.write().unwrap().data[0] = 55;
println!("** after-nulling:"); println!("** after-nulling:");
print_window(&window); print_window(&window);
assert!(erasure::recover(&blob_recycler, &mut window, offset).is_ok()); assert!(erasure::recover(&blob_recycler, &mut window, offset, offset + num_blobs).is_ok());
println!("** after-restore:"); println!("** after-restore:");
print_window(&window); print_window(&window);
let window_l = window[offset + 1].clone().unwrap(); let window_l = window[offset + 1].clone().unwrap();

View File

@ -313,6 +313,15 @@ impl Blob {
Ok(()) Ok(())
} }
pub fn is_coding(&self) -> bool {
return (self.get_flags().unwrap() & BLOB_FLAG_IS_CODING) != 0;
}
pub fn set_coding(&mut self) -> Result<()> {
let flags = self.get_flags().unwrap();
self.set_flags(flags | BLOB_FLAG_IS_CODING)
}
pub fn data(&self) -> &[u8] { pub fn data(&self) -> &[u8] {
&self.data[BLOB_FLAGS_END..] &self.data[BLOB_FLAGS_END..]
} }

View File

@ -3,7 +3,7 @@
use crdt::Crdt; use crdt::Crdt;
#[cfg(feature = "erasure")] #[cfg(feature = "erasure")]
use erasure; use erasure;
use packet::{Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedPackets, BLOB_SIZE, BLOB_FLAG_IS_CODING}; use packet::{Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedPackets, BLOB_SIZE};
use result::{Error, Result}; use result::{Error, Result};
use std::collections::VecDeque; use std::collections::VecDeque;
use std::net::{SocketAddr, UdpSocket}; use std::net::{SocketAddr, UdpSocket};
@ -178,11 +178,10 @@ fn repair_window(
) -> Result<()> { ) -> Result<()> {
#[cfg(feature = "erasure")] #[cfg(feature = "erasure")]
{ {
if erasure::recover(_recycler, &mut locked_window.write().unwrap(), *consumed).is_err() { if erasure::recover(_recycler, &mut locked_window.write().unwrap(), *consumed, *received).is_err() {
trace!("erasure::recover failed"); trace!("erasure::recover failed");
} }
} }
let reqs = find_next_missing(locked_window, crdt, consumed, received)?;
//exponential backoff //exponential backoff
if *last != *consumed { if *last != *consumed {
*times = 0; *times = 0;
@ -194,6 +193,7 @@ fn repair_window(
trace!("repair_window counter {} {}", *times, *consumed); trace!("repair_window counter {} {}", *times, *consumed);
return Ok(()); return Ok(());
} }
let reqs = find_next_missing(locked_window, crdt, consumed, received)?;
let sock = UdpSocket::bind("0.0.0.0:0")?; let sock = UdpSocket::bind("0.0.0.0:0")?;
for (to, req) in reqs { for (to, req) in reqs {
//todo cache socket //todo cache socket
@ -293,7 +293,7 @@ fn recv_window(
} }
let mut is_coding = false; let mut is_coding = false;
if let &Some(ref cblob) = &window[k] { if let &Some(ref cblob) = &window[k] {
if (cblob.read().expect("blob read lock for flags streamer::window").get_flags().unwrap() & BLOB_FLAG_IS_CODING) != 0 { if cblob.read().expect("blob read lock for flags streamer::window").is_coding() {
is_coding = true; is_coding = true;
} }
} }
@ -330,7 +330,15 @@ fn print_window(
} else if v.is_none() { } else if v.is_none() {
"0" "0"
} else { } else {
"1" if let &Some(ref cblob) = &v {
if cblob.read().unwrap().is_coding() {
"C"
} else {
"1"
}
} else {
"0"
}
} }
}) })
.collect(); .collect();