building now

This commit is contained in:
Rob Walker
2018-07-17 15:00:22 -07:00
parent dab98dcd81
commit 257acdcda1
4 changed files with 124 additions and 79 deletions

View File

@ -578,10 +578,10 @@ impl Crdt {
for i in *transmit_index..received_index { for i in *transmit_index..received_index {
let is = i as usize; let is = i as usize;
let k = is % window_l.len(); let k = is % window_l.len();
assert!(window_l[k].is_some()); assert!(window_l[k].data.is_some());
let pos = is % broadcast_table.len(); let pos = is % broadcast_table.len();
orders.push((window_l[k].clone(), &broadcast_table[pos])); orders.push((window_l[k].data.clone(), &broadcast_table[pos]));
} }
trace!("broadcast orders table {}", orders.len()); trace!("broadcast orders table {}", orders.len());
@ -917,7 +917,7 @@ impl Crdt {
blob_recycler: &BlobRecycler, blob_recycler: &BlobRecycler,
) -> Option<SharedBlob> { ) -> Option<SharedBlob> {
let pos = (ix as usize) % window.read().unwrap().len(); let pos = (ix as usize) % window.read().unwrap().len();
if let Some(blob) = &window.read().unwrap()[pos] { if let Some(blob) = &window.read().unwrap()[pos].data {
let mut wblob = blob.write().unwrap(); let mut wblob = blob.write().unwrap();
let blob_ix = wblob.get_index().expect("run_window_request get_index"); let blob_ix = wblob.get_index().expect("run_window_request get_index");
if blob_ix == ix { if blob_ix == ix {
@ -960,7 +960,7 @@ impl Crdt {
} }
} else { } else {
inc_new_counter!("crdt-window-request-fail", 1); inc_new_counter!("crdt-window-request-fail", 1);
assert!(window.read().unwrap()[pos].is_none()); assert!(window.read().unwrap()[pos].data.is_none());
info!( info!(
"{:x}: failed RequestWindowIndex {:x} {} {}", "{:x}: failed RequestWindowIndex {:x} {} {}",
me.debug_id(), me.debug_id(),
@ -1809,7 +1809,7 @@ mod tests {
assert!(rv.is_none()); assert!(rv.is_none());
let out = recycler.allocate(); let out = recycler.allocate();
out.write().unwrap().meta.size = 200; out.write().unwrap().meta.size = 200;
window.write().unwrap()[0] = Some(out); window.write().unwrap()[0].data = Some(out);
let rv = Crdt::run_window_request(&window, &me, &me, 0, &recycler); let rv = Crdt::run_window_request(&window, &me, &me, 0, &recycler);
assert!(rv.is_some()); assert!(rv.is_some());
let v = rv.unwrap(); let v = rv.unwrap();
@ -1838,7 +1838,7 @@ mod tests {
let blob = recycler.allocate(); let blob = recycler.allocate();
let blob_size = 200; let blob_size = 200;
blob.write().unwrap().meta.size = blob_size; blob.write().unwrap().meta.size = blob_size;
window.write().unwrap()[0] = Some(blob); window.write().unwrap()[0].data = Some(blob);
let num_requests: u32 = 64; let num_requests: u32 = 64;
for i in 0..num_requests { for i in 0..num_requests {

View File

@ -223,8 +223,8 @@ pub fn generate_coding(
} }
let mut coding_blobs = Vec::with_capacity(NUM_CODING); let mut coding_blobs = Vec::with_capacity(NUM_CODING);
let mut coding_locks = Vec::with_cpacity(NUM_CODING); let mut coding_locks = Vec::with_capacity(NUM_CODING);
let mut coding_ptrs: Vec<&mut [u8]> = Vec::with_cpacity(NUM_CODING); let mut coding_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_CODING);
let coding_start = block_start + NUM_DATA - NUM_CODING; let coding_start = block_start + NUM_DATA - NUM_CODING;
let coding_end = block_start + NUM_DATA; let coding_end = block_start + NUM_DATA;
@ -234,16 +234,16 @@ pub fn generate_coding(
window[n].coding = Some(recycler.allocate()); window[n].coding = Some(recycler.allocate());
} }
let w_l = window[n].coding.clone().unwrap(); let coding = window[n].coding.clone().unwrap();
w_l.write().unwrap().set_size(max_data_size); let mut coding_wl = coding.write().unwrap();
w_l.write() {
.unwrap() let data = window[n].data.clone().unwrap();
.set_index(window[n].data.get_index().unwrap()); let data_rl = data.read().unwrap();
w_l.write() coding_wl.set_index(data_rl.get_index().unwrap()).unwrap();
.unwrap() coding_wl.set_id(data_rl.get_id().unwrap()).unwrap();
.set_id(window[n].data.get_id().unwrap()); }
coding_wl.set_size(max_data_size);
if w_l.write().unwrap().set_coding().is_err() { if coding_wl.set_coding().is_err() {
return Err(ErasureError::EncodeError); return Err(ErasureError::EncodeError);
} }
coding_blobs.push( coding_blobs.push(
@ -349,16 +349,21 @@ pub fn recover(
// add the data blobs we have into recovery blob vector // add the data blobs we have into recovery blob vector
for i in block_start..coding_end { for i in block_start..coding_end {
let j = i % window.len(); let j = i % window.len();
let mut b = &mut window[j];
if b.data.is_some() { if window[j].data.is_some() {
if meta.is_none() { if meta.is_none() {
let bl = b.data.clone().unwrap(); let bl = window[j].data.clone().unwrap();
meta = Some(bl.read().unwrap().meta.clone()); meta = Some(bl.read().unwrap().meta.clone());
} }
blobs.push(b.data.clone().expect("'blobs' arr in pb fn recover")); blobs.push(
window[j]
.data
.clone()
.expect("'blobs' arr in pb fn recover"),
);
} else { } else {
let n = recycler.allocate(); let n = recycler.allocate();
*b.data = Some(n.clone()); window[j].data = Some(n.clone());
// mark the missing memory // mark the missing memory
blobs.push(n); blobs.push(n);
erasures.push((i - block_start) as i32); erasures.push((i - block_start) as i32);
@ -366,16 +371,20 @@ pub fn recover(
} }
for i in coding_start..coding_end { for i in coding_start..coding_end {
let j = i % window.len(); let j = i % window.len();
let mut b = &mut window[j]; if window[j].coding.is_some() {
if b.coding.is_some() {
if size.is_none() { if size.is_none() {
let bl = b.coding.clone().unwrap(); let bl = window[j].coding.clone().unwrap();
size = Some(bl.read().unwrap().meta.size - BLOB_HEADER_SIZE); size = Some(bl.read().unwrap().meta.size - BLOB_HEADER_SIZE);
} }
blobs.push(b.coding.clone().expect("'blobs' arr in pb fn recover")); blobs.push(
window[j]
.coding
.clone()
.expect("'blobs' arr in pb fn recover"),
);
} else { } else {
let n = recycler.allocate(); let n = recycler.allocate();
*b = Some(n.clone()); window[j].coding = Some(n.clone());
//mark the missing memory //mark the missing memory
blobs.push(n); blobs.push(n);
erasures.push((i - block_start + NUM_DATA) as i32); erasures.push((i - block_start + NUM_DATA) as i32);
@ -516,11 +525,14 @@ mod test {
blob_recycler: &BlobRecycler, blob_recycler: &BlobRecycler,
offset: usize, offset: usize,
num_blobs: usize, num_blobs: usize,
) -> [WindowSlot; 32] { ) -> Vec<WindowSlot> {
let mut window = [WindowSlot { let mut window = vec![
WindowSlot {
data: None, data: None,
coding: None, coding: None
}; 32]; };
32
];
let mut blobs = Vec::new(); let mut blobs = Vec::new();
for i in 0..num_blobs { for i in 0..num_blobs {
let b = blob_recycler.allocate(); let b = blob_recycler.allocate();
@ -563,14 +575,14 @@ mod test {
print_window(&window); print_window(&window);
// Generate the coding blocks // Generate the coding blocks
assert!(erasure::generate_coding(&mut window, blob_recycler, offset, num_blobs).is_ok()); assert!(erasure::generate_coding(&mut window, &blob_recycler, offset, num_blobs).is_ok());
println!("** after-gen-coding:"); println!("** after-gen-coding:");
print_window(&window); print_window(&window);
let erase_offset = offset; let erase_offset = offset;
// Create a hole in the window // Create a hole in the window
let refwindow = window[erase_offset].clone(); let refwindow = window[erase_offset].data.clone();
window[erase_offset] = None; window[erase_offset].data = None;
// Recover it from coding // Recover it from coding
assert!(erasure::recover(&blob_recycler, &mut window, offset, offset + num_blobs).is_ok()); assert!(erasure::recover(&blob_recycler, &mut window, offset, offset + num_blobs).is_ok());
@ -578,7 +590,7 @@ mod test {
print_window(&window); print_window(&window);
// Check the result // Check the result
let window_l = window[erase_offset].clone().unwrap(); let window_l = window[erase_offset].data.clone().unwrap();
let window_l2 = window_l.read().unwrap(); let window_l2 = window_l.read().unwrap();
let ref_l = refwindow.clone().unwrap(); let ref_l = refwindow.clone().unwrap();
let ref_l2 = ref_l.read().unwrap(); let ref_l2 = ref_l.read().unwrap();

View File

@ -1,7 +1,7 @@
//! The `ncp` module implements the network control plane. //! The `ncp` module implements the network control plane.
use crdt::Crdt; use crdt::Crdt;
use packet::{BlobRecycler, SharedBlob}; use packet::BlobRecycler;
use result::Result; use result::Result;
use service::Service; use service::Service;
use std::net::UdpSocket; use std::net::UdpSocket;
@ -19,7 +19,7 @@ pub struct Ncp {
impl Ncp { impl Ncp {
pub fn new( pub fn new(
crdt: &Arc<RwLock<Crdt>>, crdt: &Arc<RwLock<Crdt>>,
window: Arc<RwLock<Vec<Option<SharedBlob>>>>, window: streamer::Window,
gossip_listen_socket: UdpSocket, gossip_listen_socket: UdpSocket,
gossip_send_socket: UdpSocket, gossip_send_socket: UdpSocket,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,

View File

@ -23,7 +23,33 @@ pub type PacketReceiver = Receiver<SharedPackets>;
pub type PacketSender = Sender<SharedPackets>; pub type PacketSender = Sender<SharedPackets>;
pub type BlobSender = Sender<SharedBlobs>; pub type BlobSender = Sender<SharedBlobs>;
pub type BlobReceiver = Receiver<SharedBlobs>; pub type BlobReceiver = Receiver<SharedBlobs>;
pub type Window = Arc<RwLock<Vec<Option<SharedBlob>>>>;
#[derive(Clone)]
pub struct WindowSlot {
pub data: Option<SharedBlob>,
pub coding: Option<SharedBlob>,
}
//impl Copy for WindowSlot {}
//impl Clone for WindowSlot {
// fn clone(&self) -> WindowSlot {
// WindowSlot {
// data: if self.data.is_some() {
// Some(self.data.clone())
// } else {
// None
// },
// coding: if self.coding.is_some() {
// Some(self.coding.clone())
// } else {
// None
// },
// }
// }
//}
pub type Window = Arc<RwLock<Vec<WindowSlot>>>;
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
pub enum WindowError { pub enum WindowError {
@ -169,7 +195,7 @@ fn find_next_missing(
let reqs: Vec<_> = (*consumed..*received) let reqs: Vec<_> = (*consumed..*received)
.filter_map(|pix| { .filter_map(|pix| {
let i = (pix % WINDOW_SIZE) as usize; let i = (pix % WINDOW_SIZE) as usize;
if window[i].is_none() { if window[i].data.is_none() {
let val = crdt.read().unwrap().window_index_request(pix as u64); let val = crdt.read().unwrap().window_index_request(pix as u64);
if let Ok((to, req)) = val { if let Ok((to, req)) = val {
return Some((to, req)); return Some((to, req));
@ -335,21 +361,21 @@ fn process_blob(
// of consumed to received and clear any old ones // of consumed to received and clear any old ones
for ix in *consumed..(pix + 1) { for ix in *consumed..(pix + 1) {
let k = (ix % WINDOW_SIZE) as usize; let k = (ix % WINDOW_SIZE) as usize;
if let Some(b) = &mut window[k] { if let Some(b) = &mut window[k].data {
if b.read().unwrap().get_index().unwrap() >= *consumed as u64 { if b.read().unwrap().get_index().unwrap() >= *consumed as u64 {
continue; continue;
} }
} }
if let Some(b) = mem::replace(&mut window[k], None) { if let Some(b) = mem::replace(&mut window[k].data, None) {
recycler.recycle(b); recycler.recycle(b);
} }
} }
// Insert the new blob into the window // Insert the new blob into the window
// spot should be free because we cleared it above // spot should be free because we cleared it above
if window[w].is_none() { if window[w].data.is_none() {
window[w] = Some(b); window[w].data = Some(b);
} else if let Some(cblob) = &window[w] { } else if let Some(cblob) = &window[w].data {
if cblob.read().unwrap().get_index().unwrap() != pix as u64 { if cblob.read().unwrap().get_index().unwrap() != pix as u64 {
warn!("{:x}: overrun blob at index {:}", debug_id, w); warn!("{:x}: overrun blob at index {:}", debug_id, w);
} else { } else {
@ -360,11 +386,11 @@ fn process_blob(
let k = (*consumed % WINDOW_SIZE) as usize; let k = (*consumed % WINDOW_SIZE) as usize;
trace!("k: {} consumed: {}", k, *consumed); trace!("k: {} consumed: {}", k, *consumed);
if window[k].is_none() { if window[k].data.is_none() {
break; break;
} }
let mut is_coding = false; let mut is_coding = false;
if let Some(ref cblob) = window[k] { if let Some(ref cblob) = window[k].data {
let cblob_r = cblob let cblob_r = cblob
.read() .read()
.expect("blob read lock for flogs streamer::window"); .expect("blob read lock for flogs streamer::window");
@ -376,29 +402,29 @@ fn process_blob(
} }
} }
if !is_coding { if !is_coding {
consume_queue.push_back(window[k].clone().expect("clone in fn recv_window")); consume_queue.push_back(window[k].data.clone().expect("clone in fn recv_window"));
*consumed += 1; *consumed += 1;
} else { } else {
#[cfg(feature = "erasure")] // #[cfg(feature = "erasure")]
{ // {
let block_start = *consumed - (*consumed % erasure::NUM_CODED as u64); // let block_start = *consumed - (*consumed % erasure::NUM_DATA as u64);
let coding_end = block_start + erasure::NUM_CODED as u64; // let coding_end = block_start + erasure::NUM_DATA as u64;
// We've received all this block's data blobs, go and null out the window now // // We've received all this block's data blobs, go and null out the window now
for j in block_start..*consumed { // for j in block_start..*consumed {
if let Some(b) = mem::replace(&mut window[(j % WINDOW_SIZE) as usize], None) { // if let Some(b) = mem::replace(&mut window[(j % WINDOW_SIZE) as usize], None) {
recycler.recycle(b); // recycler.recycle(b);
} // }
} // }
for j in *consumed..coding_end { // for j in *consumed..coding_end {
window[(j % WINDOW_SIZE) as usize] = None; // window[(j % WINDOW_SIZE) as usize] = None;
} // }
//
*consumed += erasure::MAX_MISSING as u64; // *consumed += erasure::MAX_MISSING as u64;
debug!( // debug!(
"skipping processing coding blob k: {} consumed: {}", // "skipping processing coding blob k: {} consumed: {}",
k, *consumed // k, *consumed
); // );
} // }
} }
} }
} }
@ -505,9 +531,9 @@ fn print_window(debug_id: u64, locked_window: &Window, consumed: u64) {
.map(|(i, v)| { .map(|(i, v)| {
if i == (consumed % WINDOW_SIZE) as usize { if i == (consumed % WINDOW_SIZE) as usize {
"_" "_"
} else if v.is_none() { } else if v.data.is_none() {
"0" "0"
} else if let Some(ref cblob) = v { } else if let Some(ref cblob) = v.data {
if cblob.read().unwrap().is_coding() { if cblob.read().unwrap().is_coding() {
"C" "C"
} else { } else {
@ -523,7 +549,13 @@ fn print_window(debug_id: u64, locked_window: &Window, consumed: u64) {
} }
pub fn default_window() -> Window { pub fn default_window() -> Window {
Arc::new(RwLock::new(vec![None; WINDOW_SIZE as usize])) Arc::new(RwLock::new(vec![
WindowSlot {
data: None,
coding: None,
};
WINDOW_SIZE as usize
]))
} }
/// Initialize a rebroadcast window with most recent Entry blobs /// Initialize a rebroadcast window with most recent Entry blobs
@ -557,8 +589,8 @@ pub fn initialized_window(
let ix = b.read().unwrap().get_index().expect("blob index"); let ix = b.read().unwrap().get_index().expect("blob index");
let pos = (ix % WINDOW_SIZE) as usize; let pos = (ix % WINDOW_SIZE) as usize;
trace!("caching {} at {}", ix, pos); trace!("caching {} at {}", ix, pos);
assert!(win[pos].is_none()); assert!(win[pos].data.is_none());
win[pos] = Some(b); win[pos].data = Some(b);
} }
} }
@ -648,8 +680,8 @@ fn broadcast(
for mut blobs in blobs_chunked { for mut blobs in blobs_chunked {
// Insert the coding blobs into the blob stream // Insert the coding blobs into the blob stream
#[cfg(feature = "erasure")] // #[cfg(feature = "erasure")]
erasure::add_coding_blobs(recycler, &mut blobs, *receive_index); // erasure::add_coding_blobs(recycler, &mut blobs, *receive_index);
let blobs_len = blobs.len(); let blobs_len = blobs.len();
debug!("{:x} broadcast blobs.len: {}", debug_id, blobs_len); debug!("{:x} broadcast blobs.len: {}", debug_id, blobs_len);
@ -664,7 +696,7 @@ fn broadcast(
for b in &blobs { for b in &blobs {
let ix = b.read().unwrap().get_index().expect("blob index"); let ix = b.read().unwrap().get_index().expect("blob index");
let pos = (ix % WINDOW_SIZE) as usize; let pos = (ix % WINDOW_SIZE) as usize;
if let Some(x) = mem::replace(&mut win[pos], None) { if let Some(x) = mem::replace(&mut win[pos].data, None) {
trace!( trace!(
"popped {} at {}", "popped {} at {}",
x.read().unwrap().get_index().unwrap(), x.read().unwrap().get_index().unwrap(),
@ -678,8 +710,8 @@ fn broadcast(
let ix = b.read().unwrap().get_index().expect("blob index"); let ix = b.read().unwrap().get_index().expect("blob index");
let pos = (ix % WINDOW_SIZE) as usize; let pos = (ix % WINDOW_SIZE) as usize;
trace!("caching {} at {}", ix, pos); trace!("caching {} at {}", ix, pos);
assert!(win[pos].is_none()); assert!(win[pos].data.is_none());
win[pos] = Some(b); win[pos].data = Some(b);
} }
} }
@ -688,6 +720,7 @@ fn broadcast(
{ {
erasure::generate_coding( erasure::generate_coding(
&mut window.write().unwrap(), &mut window.write().unwrap(),
recycler,
*receive_index as usize, *receive_index as usize,
blobs_len, blobs_len,
)?; )?;