diff --git a/src/window.rs b/src/window.rs index 660b12b4ee..cbfca3e66e 100644 --- a/src/window.rs +++ b/src/window.rs @@ -31,6 +31,21 @@ pub struct WindowSlot { pub leader_unknown: bool, } +impl WindowSlot { + fn blob_index(&self) -> Option { + match self.data { + Some(ref blob) => blob.read().unwrap().get_index().ok(), + None => None, + } + } + + fn clear_data(&mut self, recycler: &BlobRecycler) { + if let Some(blob) = mem::replace(&mut self.data, None) { + recycler.recycle(blob, "WindowSlot::clear_data"); + } + } +} + type Window = Vec; pub type SharedWindow = Arc>; @@ -40,33 +55,23 @@ pub struct WindowIndex { pub coding: u64, } -fn find_next_missing( +/// Finds available slots, clears them, and returns their indices. +fn clear_window_slots( window: &mut Window, - crdt: &Arc>, recycler: &BlobRecycler, consumed: u64, received: u64, -) -> Vec<(SocketAddr, Vec)> { +) -> Vec { (consumed..received) .filter_map(|pix| { let i = (pix % WINDOW_SIZE) as usize; - let ref mut slot = &mut window[i]; - - if let Some(blob) = mem::replace(&mut slot.data, None) { - let blob_idx = blob.read().unwrap().get_index().unwrap(); + if let Some(blob_idx) = window[i].blob_index() { if blob_idx == pix { - mem::replace(&mut slot.data, Some(blob)); - } else { - recycler.recycle(blob, "find_next_missing"); + return None; } } - - if slot.data.is_some() { - return None; - } - - let val = crdt.read().unwrap().window_index_request(pix); - val.ok() + window[i].clear_data(recycler); + Some(pix) }) .collect() } @@ -134,7 +139,12 @@ fn repair_window( } let mut window = window.write().unwrap(); - let reqs = find_next_missing(&mut window, crdt, recycler, consumed, highest_lost); + let idxs = clear_window_slots(&mut window, recycler, consumed, highest_lost); + let reqs: Vec<_> = idxs + .into_iter() + .filter_map(|pix| crdt.read().unwrap().window_index_request(pix).ok()) + .collect(); + inc_new_counter_info!("streamer-repair_window-repair", reqs.len()); if log_enabled!(Level::Trace) { trace!( @@ -146,7 +156,7 @@ fn repair_window( reqs.len() ); - for (to, _) in reqs.clone() { + for (to, _) in &reqs.clone() { trace!("{}: repair_window request to {}", id, to); } }