From 257acdcda183ed162286812c265226ab2cc2ae8d Mon Sep 17 00:00:00 2001 From: Rob Walker Date: Tue, 17 Jul 2018 15:00:22 -0700 Subject: [PATCH] building now --- src/crdt.rs | 12 ++--- src/erasure.rs | 74 ++++++++++++++++++------------- src/ncp.rs | 4 +- src/streamer.rs | 113 +++++++++++++++++++++++++++++++----------------- 4 files changed, 124 insertions(+), 79 deletions(-) diff --git a/src/crdt.rs b/src/crdt.rs index f9c89ff454..862fc7bffc 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -578,10 +578,10 @@ impl Crdt { for i in *transmit_index..received_index { let is = i as usize; let k = is % window_l.len(); - assert!(window_l[k].is_some()); + assert!(window_l[k].data.is_some()); let pos = is % broadcast_table.len(); - orders.push((window_l[k].clone(), &broadcast_table[pos])); + orders.push((window_l[k].data.clone(), &broadcast_table[pos])); } trace!("broadcast orders table {}", orders.len()); @@ -917,7 +917,7 @@ impl Crdt { blob_recycler: &BlobRecycler, ) -> Option { let pos = (ix as usize) % window.read().unwrap().len(); - if let Some(blob) = &window.read().unwrap()[pos] { + if let Some(blob) = &window.read().unwrap()[pos].data { let mut wblob = blob.write().unwrap(); let blob_ix = wblob.get_index().expect("run_window_request get_index"); if blob_ix == ix { @@ -960,7 +960,7 @@ impl Crdt { } } else { inc_new_counter!("crdt-window-request-fail", 1); - assert!(window.read().unwrap()[pos].is_none()); + assert!(window.read().unwrap()[pos].data.is_none()); info!( "{:x}: failed RequestWindowIndex {:x} {} {}", me.debug_id(), @@ -1809,7 +1809,7 @@ mod tests { assert!(rv.is_none()); let out = recycler.allocate(); out.write().unwrap().meta.size = 200; - window.write().unwrap()[0] = Some(out); + window.write().unwrap()[0].data = Some(out); let rv = Crdt::run_window_request(&window, &me, &me, 0, &recycler); assert!(rv.is_some()); let v = rv.unwrap(); @@ -1838,7 +1838,7 @@ mod tests { let blob = recycler.allocate(); let blob_size = 200; blob.write().unwrap().meta.size = blob_size; - window.write().unwrap()[0] = Some(blob); + window.write().unwrap()[0].data = Some(blob); let num_requests: u32 = 64; for i in 0..num_requests { diff --git a/src/erasure.rs b/src/erasure.rs index 2ae8d9be81..f68bab69fc 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -223,8 +223,8 @@ pub fn generate_coding( } let mut coding_blobs = Vec::with_capacity(NUM_CODING); - let mut coding_locks = Vec::with_cpacity(NUM_CODING); - let mut coding_ptrs: Vec<&mut [u8]> = Vec::with_cpacity(NUM_CODING); + let mut coding_locks = Vec::with_capacity(NUM_CODING); + let mut coding_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_CODING); let coding_start = block_start + NUM_DATA - NUM_CODING; let coding_end = block_start + NUM_DATA; @@ -234,16 +234,16 @@ pub fn generate_coding( window[n].coding = Some(recycler.allocate()); } - let w_l = window[n].coding.clone().unwrap(); - w_l.write().unwrap().set_size(max_data_size); - w_l.write() - .unwrap() - .set_index(window[n].data.get_index().unwrap()); - w_l.write() - .unwrap() - .set_id(window[n].data.get_id().unwrap()); - - if w_l.write().unwrap().set_coding().is_err() { + let coding = window[n].coding.clone().unwrap(); + let mut coding_wl = coding.write().unwrap(); + { + let data = window[n].data.clone().unwrap(); + let data_rl = data.read().unwrap(); + coding_wl.set_index(data_rl.get_index().unwrap()).unwrap(); + coding_wl.set_id(data_rl.get_id().unwrap()).unwrap(); + } + coding_wl.set_size(max_data_size); + if coding_wl.set_coding().is_err() { return Err(ErasureError::EncodeError); } coding_blobs.push( @@ -349,16 +349,21 @@ pub fn recover( // add the data blobs we have into recovery blob vector for i in block_start..coding_end { let j = i % window.len(); - let mut b = &mut window[j]; - if b.data.is_some() { + + if window[j].data.is_some() { if meta.is_none() { - let bl = b.data.clone().unwrap(); + let bl = window[j].data.clone().unwrap(); meta = Some(bl.read().unwrap().meta.clone()); } - blobs.push(b.data.clone().expect("'blobs' arr in pb fn recover")); + blobs.push( + window[j] + .data + .clone() + .expect("'blobs' arr in pb fn recover"), + ); } else { let n = recycler.allocate(); - *b.data = Some(n.clone()); + window[j].data = Some(n.clone()); // mark the missing memory blobs.push(n); erasures.push((i - block_start) as i32); @@ -366,16 +371,20 @@ pub fn recover( } for i in coding_start..coding_end { let j = i % window.len(); - let mut b = &mut window[j]; - if b.coding.is_some() { + if window[j].coding.is_some() { if size.is_none() { - let bl = b.coding.clone().unwrap(); + let bl = window[j].coding.clone().unwrap(); size = Some(bl.read().unwrap().meta.size - BLOB_HEADER_SIZE); } - blobs.push(b.coding.clone().expect("'blobs' arr in pb fn recover")); + blobs.push( + window[j] + .coding + .clone() + .expect("'blobs' arr in pb fn recover"), + ); } else { let n = recycler.allocate(); - *b = Some(n.clone()); + window[j].coding = Some(n.clone()); //mark the missing memory blobs.push(n); erasures.push((i - block_start + NUM_DATA) as i32); @@ -516,11 +525,14 @@ mod test { blob_recycler: &BlobRecycler, offset: usize, num_blobs: usize, - ) -> [WindowSlot; 32] { - let mut window = [WindowSlot { - data: None, - coding: None, - }; 32]; + ) -> Vec { + let mut window = vec![ + WindowSlot { + data: None, + coding: None + }; + 32 + ]; let mut blobs = Vec::new(); for i in 0..num_blobs { let b = blob_recycler.allocate(); @@ -563,14 +575,14 @@ mod test { print_window(&window); // Generate the coding blocks - assert!(erasure::generate_coding(&mut window, blob_recycler, offset, num_blobs).is_ok()); + assert!(erasure::generate_coding(&mut window, &blob_recycler, offset, num_blobs).is_ok()); println!("** after-gen-coding:"); print_window(&window); let erase_offset = offset; // Create a hole in the window - let refwindow = window[erase_offset].clone(); - window[erase_offset] = None; + let refwindow = window[erase_offset].data.clone(); + window[erase_offset].data = None; // Recover it from coding assert!(erasure::recover(&blob_recycler, &mut window, offset, offset + num_blobs).is_ok()); @@ -578,7 +590,7 @@ mod test { print_window(&window); // Check the result - let window_l = window[erase_offset].clone().unwrap(); + let window_l = window[erase_offset].data.clone().unwrap(); let window_l2 = window_l.read().unwrap(); let ref_l = refwindow.clone().unwrap(); let ref_l2 = ref_l.read().unwrap(); diff --git a/src/ncp.rs b/src/ncp.rs index 062344592d..2bcf624a98 100644 --- a/src/ncp.rs +++ b/src/ncp.rs @@ -1,7 +1,7 @@ //! The `ncp` module implements the network control plane. use crdt::Crdt; -use packet::{BlobRecycler, SharedBlob}; +use packet::BlobRecycler; use result::Result; use service::Service; use std::net::UdpSocket; @@ -19,7 +19,7 @@ pub struct Ncp { impl Ncp { pub fn new( crdt: &Arc>, - window: Arc>>>, + window: streamer::Window, gossip_listen_socket: UdpSocket, gossip_send_socket: UdpSocket, exit: Arc, diff --git a/src/streamer.rs b/src/streamer.rs index 6082ff7214..c6062899e3 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -23,7 +23,33 @@ pub type PacketReceiver = Receiver; pub type PacketSender = Sender; pub type BlobSender = Sender; pub type BlobReceiver = Receiver; -pub type Window = Arc>>>; + +#[derive(Clone)] +pub struct WindowSlot { + pub data: Option, + pub coding: Option, +} + +//impl Copy for WindowSlot {} + +//impl Clone for WindowSlot { +// fn clone(&self) -> WindowSlot { +// WindowSlot { +// data: if self.data.is_some() { +// Some(self.data.clone()) +// } else { +// None +// }, +// coding: if self.coding.is_some() { +// Some(self.coding.clone()) +// } else { +// None +// }, +// } +// } +//} + +pub type Window = Arc>>; #[derive(Debug, PartialEq, Eq)] pub enum WindowError { @@ -169,7 +195,7 @@ fn find_next_missing( let reqs: Vec<_> = (*consumed..*received) .filter_map(|pix| { let i = (pix % WINDOW_SIZE) as usize; - if window[i].is_none() { + if window[i].data.is_none() { let val = crdt.read().unwrap().window_index_request(pix as u64); if let Ok((to, req)) = val { return Some((to, req)); @@ -335,21 +361,21 @@ fn process_blob( // of consumed to received and clear any old ones for ix in *consumed..(pix + 1) { let k = (ix % WINDOW_SIZE) as usize; - if let Some(b) = &mut window[k] { + if let Some(b) = &mut window[k].data { if b.read().unwrap().get_index().unwrap() >= *consumed as u64 { continue; } } - if let Some(b) = mem::replace(&mut window[k], None) { + if let Some(b) = mem::replace(&mut window[k].data, None) { recycler.recycle(b); } } // Insert the new blob into the window // spot should be free because we cleared it above - if window[w].is_none() { - window[w] = Some(b); - } else if let Some(cblob) = &window[w] { + if window[w].data.is_none() { + window[w].data = Some(b); + } else if let Some(cblob) = &window[w].data { if cblob.read().unwrap().get_index().unwrap() != pix as u64 { warn!("{:x}: overrun blob at index {:}", debug_id, w); } else { @@ -360,11 +386,11 @@ fn process_blob( let k = (*consumed % WINDOW_SIZE) as usize; trace!("k: {} consumed: {}", k, *consumed); - if window[k].is_none() { + if window[k].data.is_none() { break; } let mut is_coding = false; - if let Some(ref cblob) = window[k] { + if let Some(ref cblob) = window[k].data { let cblob_r = cblob .read() .expect("blob read lock for flogs streamer::window"); @@ -376,29 +402,29 @@ fn process_blob( } } if !is_coding { - consume_queue.push_back(window[k].clone().expect("clone in fn recv_window")); + consume_queue.push_back(window[k].data.clone().expect("clone in fn recv_window")); *consumed += 1; } else { - #[cfg(feature = "erasure")] - { - let block_start = *consumed - (*consumed % erasure::NUM_CODED as u64); - let coding_end = block_start + erasure::NUM_CODED as u64; - // We've received all this block's data blobs, go and null out the window now - for j in block_start..*consumed { - if let Some(b) = mem::replace(&mut window[(j % WINDOW_SIZE) as usize], None) { - recycler.recycle(b); - } - } - for j in *consumed..coding_end { - window[(j % WINDOW_SIZE) as usize] = None; - } - - *consumed += erasure::MAX_MISSING as u64; - debug!( - "skipping processing coding blob k: {} consumed: {}", - k, *consumed - ); - } + // #[cfg(feature = "erasure")] + // { + // let block_start = *consumed - (*consumed % erasure::NUM_DATA as u64); + // let coding_end = block_start + erasure::NUM_DATA as u64; + // // We've received all this block's data blobs, go and null out the window now + // for j in block_start..*consumed { + // if let Some(b) = mem::replace(&mut window[(j % WINDOW_SIZE) as usize], None) { + // recycler.recycle(b); + // } + // } + // for j in *consumed..coding_end { + // window[(j % WINDOW_SIZE) as usize] = None; + // } + // + // *consumed += erasure::MAX_MISSING as u64; + // debug!( + // "skipping processing coding blob k: {} consumed: {}", + // k, *consumed + // ); + // } } } } @@ -505,9 +531,9 @@ fn print_window(debug_id: u64, locked_window: &Window, consumed: u64) { .map(|(i, v)| { if i == (consumed % WINDOW_SIZE) as usize { "_" - } else if v.is_none() { + } else if v.data.is_none() { "0" - } else if let Some(ref cblob) = v { + } else if let Some(ref cblob) = v.data { if cblob.read().unwrap().is_coding() { "C" } else { @@ -523,7 +549,13 @@ fn print_window(debug_id: u64, locked_window: &Window, consumed: u64) { } pub fn default_window() -> Window { - Arc::new(RwLock::new(vec![None; WINDOW_SIZE as usize])) + Arc::new(RwLock::new(vec![ + WindowSlot { + data: None, + coding: None, + }; + WINDOW_SIZE as usize + ])) } /// Initialize a rebroadcast window with most recent Entry blobs @@ -557,8 +589,8 @@ pub fn initialized_window( let ix = b.read().unwrap().get_index().expect("blob index"); let pos = (ix % WINDOW_SIZE) as usize; trace!("caching {} at {}", ix, pos); - assert!(win[pos].is_none()); - win[pos] = Some(b); + assert!(win[pos].data.is_none()); + win[pos].data = Some(b); } } @@ -648,8 +680,8 @@ fn broadcast( for mut blobs in blobs_chunked { // Insert the coding blobs into the blob stream - #[cfg(feature = "erasure")] - erasure::add_coding_blobs(recycler, &mut blobs, *receive_index); + // #[cfg(feature = "erasure")] + // erasure::add_coding_blobs(recycler, &mut blobs, *receive_index); let blobs_len = blobs.len(); debug!("{:x} broadcast blobs.len: {}", debug_id, blobs_len); @@ -664,7 +696,7 @@ fn broadcast( for b in &blobs { let ix = b.read().unwrap().get_index().expect("blob index"); let pos = (ix % WINDOW_SIZE) as usize; - if let Some(x) = mem::replace(&mut win[pos], None) { + if let Some(x) = mem::replace(&mut win[pos].data, None) { trace!( "popped {} at {}", x.read().unwrap().get_index().unwrap(), @@ -678,8 +710,8 @@ fn broadcast( let ix = b.read().unwrap().get_index().expect("blob index"); let pos = (ix % WINDOW_SIZE) as usize; trace!("caching {} at {}", ix, pos); - assert!(win[pos].is_none()); - win[pos] = Some(b); + assert!(win[pos].data.is_none()); + win[pos].data = Some(b); } } @@ -688,6 +720,7 @@ fn broadcast( { erasure::generate_coding( &mut window.write().unwrap(), + recycler, *receive_index as usize, blobs_len, )?;