diff --git a/src/window.rs b/src/window.rs index bc5650b1a3..1976fd3bb9 100644 --- a/src/window.rs +++ b/src/window.rs @@ -56,6 +56,7 @@ pub struct WindowIndex { } pub trait WindowUtil { + /// Finds available slots, clears them, and returns their indices. fn clear_slots(&mut self, recycler: &BlobRecycler, consumed: u64, received: u64) -> Vec; fn repair( @@ -85,7 +86,18 @@ pub trait WindowUtil { impl WindowUtil for Window { fn clear_slots(&mut self, recycler: &BlobRecycler, consumed: u64, received: u64) -> Vec { - clear_window_slots(self, recycler, consumed, received) + (consumed..received) + .filter_map(|pix| { + let i = (pix % WINDOW_SIZE) as usize; + if let Some(blob_idx) = self[i].blob_index() { + if blob_idx == pix { + return None; + } + } + self[i].clear_data(recycler); + Some(pix) + }) + .collect() } fn repair( @@ -97,13 +109,87 @@ impl WindowUtil for Window { consumed: u64, received: u64, ) -> Vec<(SocketAddr, Vec)> { - repair_window(self, crdt, recycler, id, times, consumed, received) + let num_peers = crdt.read().unwrap().table.len() as u64; + let highest_lost = calculate_highest_lost_blob_index(num_peers, consumed, received); + + let idxs = self.clear_slots(recycler, consumed, highest_lost); + let reqs: Vec<_> = idxs + .into_iter() + .filter_map(|pix| crdt.read().unwrap().window_index_request(pix).ok()) + .collect(); + + inc_new_counter_info!("streamer-repair_window-repair", reqs.len()); + if log_enabled!(Level::Trace) { + trace!( + "{}: repair_window counter times: {} consumed: {} highest_lost: {} missing: {}", + id, + times, + consumed, + highest_lost, + reqs.len() + ); + + for (to, _) in &reqs { + trace!("{}: repair_window request to {}", id, to); + } + } + reqs } fn print(&self, id: &Pubkey, consumed: u64) -> String { - print_window(self, id, consumed) + let pointer: Vec<_> = self + .iter() + .enumerate() + .map(|(i, _v)| { + if i == (consumed % WINDOW_SIZE) as usize { + "V" + } else { + " " + } + }) + .collect(); + + let buf: Vec<_> = self + .iter() + .map(|v| { + if v.data.is_none() && v.coding.is_none() { + "O" + } else if v.data.is_some() && v.coding.is_some() { + "D" + } else if v.data.is_some() { + // coding.is_none() + "d" + } else { + // data.is_none() + "c" + } + }) + .collect(); + format!( + "\n{}: WINDOW ({}): {}\n{}: WINDOW ({}): {}", + id, + consumed, + pointer.join(""), + id, + consumed, + buf.join("") + ) } + /// process a blob: Add blob to the window. If a continuous set of blobs + /// starting from consumed is thereby formed, add that continuous + /// range of blobs to a queue to be sent on to the next stage. + /// + /// * `self` - the window we're operating on + /// * `id` - this node's id + /// * `blob` - the blob to be processed into the window and rebroadcast + /// * `pix` - the index of the blob, corresponds to + /// the entry height of this blob + /// * `consume_queue` - output, blobs to be rebroadcast are placed here + /// * `recycler` - where to return the blob once processed, also where + /// to return old blobs from the window + /// * `consumed` - input/output, the entry-height to which this + /// node has populated and rebroadcast entries fn process_blob( &mut self, id: &Pubkey, @@ -115,39 +201,89 @@ impl WindowUtil for Window { leader_unknown: bool, pending_retransmits: &mut bool, ) { - process_blob( - self, - id, - blob, - pix, - consume_queue, - recycler, - consumed, - leader_unknown, - pending_retransmits, - ); - } -} + let w = (pix % WINDOW_SIZE) as usize; -/// Finds available slots, clears them, and returns their indices. -fn clear_window_slots( - window: &mut Window, - recycler: &BlobRecycler, - consumed: u64, - received: u64, -) -> Vec { - (consumed..received) - .filter_map(|pix| { - let i = (pix % WINDOW_SIZE) as usize; - if let Some(blob_idx) = window[i].blob_index() { - if blob_idx == pix { - return None; - } + let is_coding = { + let blob_r = blob + .read() + .expect("blob read lock for flogs streamer::window"); + blob_r.is_coding() + }; + + // insert a newly received blob into a window slot, clearing out and recycling any previous + // blob unless the incoming blob is a duplicate (based on idx) + // returns whether the incoming is a duplicate blob + fn insert_blob_is_dup( + id: &Pubkey, + blob: SharedBlob, + pix: u64, + window_slot: &mut Option, + recycler: &BlobRecycler, + c_or_d: &str, + ) -> bool { + if let Some(old) = mem::replace(window_slot, Some(blob)) { + let is_dup = old.read().unwrap().get_index().unwrap() == pix; + recycler.recycle(old, "insert_blob_is_dup"); + trace!( + "{}: occupied {} window slot {:}, is_dup: {}", + id, + c_or_d, + pix, + is_dup + ); + is_dup + } else { + trace!("{}: empty {} window slot {:}", id, c_or_d, pix); + false } - window[i].clear_data(recycler); - Some(pix) - }) - .collect() + } + + // insert the new blob into the window, overwrite and recycle old (or duplicate) entry + let is_duplicate = if is_coding { + insert_blob_is_dup(id, blob, pix, &mut self[w].coding, recycler, "coding") + } else { + insert_blob_is_dup(id, blob, pix, &mut self[w].data, recycler, "data") + }; + + if is_duplicate { + return; + } + + self[w].leader_unknown = leader_unknown; + *pending_retransmits = true; + + #[cfg(feature = "erasure")] + { + if erasure::recover( + id, + recycler, + window, + *consumed, + (*consumed % WINDOW_SIZE) as usize, + ).is_err() + { + trace!("{}: erasure::recover failed", id); + } + } + + // push all contiguous blobs into consumed queue, increment consumed + loop { + let k = (*consumed % WINDOW_SIZE) as usize; + trace!("{}: k: {} consumed: {}", id, k, *consumed,); + + if let Some(blob) = &self[k].data { + if blob.read().unwrap().get_index().unwrap() < *consumed { + // window wrap-around, end of received + break; + } + } else { + // self[k].data is None, end of received + break; + } + consume_queue.push(self[k].data.clone().expect("clone in fn recv_window")); + *consumed += 1; + } + } } fn calculate_highest_lost_blob_index(num_peers: u64, consumed: u64, received: u64) -> u64 { @@ -186,42 +322,6 @@ fn repair_backoff(last: &mut u64, times: &mut usize, consumed: u64) -> bool { thread_rng().gen_range(0, *times as u64) == 0 } -fn repair_window( - window: &mut Window, - crdt: &Arc>, - recycler: &BlobRecycler, - id: &Pubkey, - times: usize, - consumed: u64, - received: u64, -) -> Vec<(SocketAddr, Vec)> { - let num_peers = crdt.read().unwrap().table.len() as u64; - let highest_lost = calculate_highest_lost_blob_index(num_peers, consumed, received); - - let idxs = window.clear_slots(recycler, consumed, highest_lost); - let reqs: Vec<_> = idxs - .into_iter() - .filter_map(|pix| crdt.read().unwrap().window_index_request(pix).ok()) - .collect(); - - inc_new_counter_info!("streamer-repair_window-repair", reqs.len()); - if log_enabled!(Level::Trace) { - trace!( - "{}: repair_window counter times: {} consumed: {} highest_lost: {} missing: {}", - id, - times, - consumed, - highest_lost, - reqs.len() - ); - - for (to, _) in &reqs { - trace!("{}: repair_window request to {}", id, to); - } - } - reqs -} - fn add_block_to_retransmit_queue( b: &SharedBlob, leader_id: Pubkey, @@ -320,115 +420,6 @@ fn retransmit_all_leader_blocks( Ok(()) } -/// process a blob: Add blob to the window. If a continuous set of blobs -/// starting from consumed is thereby formed, add that continuous -/// range of blobs to a queue to be sent on to the next stage. -/// -/// * `id` - this node's id -/// * `blob` - the blob to be processed into the window and rebroadcast -/// * `pix` - the index of the blob, corresponds to -/// the entry height of this blob -/// * `consume_queue` - output, blobs to be rebroadcast are placed here -/// * `window` - the window we're operating on -/// * `recycler` - where to return the blob once processed, also where -/// to return old blobs from the window -/// * `consumed` - input/output, the entry-height to which this -/// node has populated and rebroadcast entries -fn process_blob( - window: &mut Window, - id: &Pubkey, - blob: SharedBlob, - pix: u64, - consume_queue: &mut SharedBlobs, - recycler: &BlobRecycler, - consumed: &mut u64, - leader_unknown: bool, - pending_retransmits: &mut bool, -) { - let w = (pix % WINDOW_SIZE) as usize; - - let is_coding = { - let blob_r = blob - .read() - .expect("blob read lock for flogs streamer::window"); - blob_r.is_coding() - }; - - // insert a newly received blob into a window slot, clearing out and recycling any previous - // blob unless the incoming blob is a duplicate (based on idx) - // returns whether the incoming is a duplicate blob - fn insert_blob_is_dup( - id: &Pubkey, - blob: SharedBlob, - pix: u64, - window_slot: &mut Option, - recycler: &BlobRecycler, - c_or_d: &str, - ) -> bool { - if let Some(old) = mem::replace(window_slot, Some(blob)) { - let is_dup = old.read().unwrap().get_index().unwrap() == pix; - recycler.recycle(old, "insert_blob_is_dup"); - trace!( - "{}: occupied {} window slot {:}, is_dup: {}", - id, - c_or_d, - pix, - is_dup - ); - is_dup - } else { - trace!("{}: empty {} window slot {:}", id, c_or_d, pix); - false - } - } - - // insert the new blob into the window, overwrite and recycle old (or duplicate) entry - let is_duplicate = if is_coding { - insert_blob_is_dup(id, blob, pix, &mut window[w].coding, recycler, "coding") - } else { - insert_blob_is_dup(id, blob, pix, &mut window[w].data, recycler, "data") - }; - - if is_duplicate { - return; - } - - window[w].leader_unknown = leader_unknown; - *pending_retransmits = true; - - #[cfg(feature = "erasure")] - { - if erasure::recover( - id, - recycler, - window, - *consumed, - (*consumed % WINDOW_SIZE) as usize, - ).is_err() - { - trace!("{}: erasure::recover failed", id); - } - } - - // push all contiguous blobs into consumed queue, increment consumed - loop { - let k = (*consumed % WINDOW_SIZE) as usize; - trace!("{}: k: {} consumed: {}", id, k, *consumed,); - - if let Some(blob) = &window[k].data { - if blob.read().unwrap().get_index().unwrap() < *consumed { - // window wrap-around, end of received - break; - } - } else { - // window[k].data is None, end of received - break; - } - consume_queue.push(window[k].data.clone().expect("clone in fn recv_window")); - *consumed += 1; - } -} - fn blob_idx_in_window(id: &Pubkey, pix: u64, consumed: u64, received: &mut u64) -> bool { // Prevent receive window from running over // Got a blob which has already been consumed, skip it @@ -555,46 +546,6 @@ fn recv_window( Ok(()) } -pub fn print_window(window: &Window, id: &Pubkey, consumed: u64) -> String { - let pointer: Vec<_> = window - .iter() - .enumerate() - .map(|(i, _v)| { - if i == (consumed % WINDOW_SIZE) as usize { - "V" - } else { - " " - } - }) - .collect(); - - let buf: Vec<_> = window - .iter() - .map(|v| { - if v.data.is_none() && v.coding.is_none() { - "O" - } else if v.data.is_some() && v.coding.is_some() { - "D" - } else if v.data.is_some() { - // coding.is_none() - "d" - } else { - // data.is_none() - "c" - } - }) - .collect(); - format!( - "\n{}: WINDOW ({}): {}\n{}: WINDOW ({}): {}", - id, - consumed, - pointer.join(""), - id, - consumed, - buf.join("") - ) -} - pub fn default_window() -> SharedWindow { Arc::new(RwLock::new(vec![ WindowSlot::default();