Rewrite find_next_missing, call it clear_slots
This commit is contained in:
@ -31,6 +31,21 @@ pub struct WindowSlot {
|
||||
pub leader_unknown: bool,
|
||||
}
|
||||
|
||||
impl WindowSlot {
|
||||
fn blob_index(&self) -> Option<u64> {
|
||||
match self.data {
|
||||
Some(ref blob) => blob.read().unwrap().get_index().ok(),
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn clear_data(&mut self, recycler: &BlobRecycler) {
|
||||
if let Some(blob) = mem::replace(&mut self.data, None) {
|
||||
recycler.recycle(blob, "WindowSlot::clear_data");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type Window = Vec<WindowSlot>;
|
||||
pub type SharedWindow = Arc<RwLock<Window>>;
|
||||
|
||||
@ -40,33 +55,23 @@ pub struct WindowIndex {
|
||||
pub coding: u64,
|
||||
}
|
||||
|
||||
fn find_next_missing(
|
||||
/// Finds available slots, clears them, and returns their indices.
|
||||
fn clear_window_slots(
|
||||
window: &mut Window,
|
||||
crdt: &Arc<RwLock<Crdt>>,
|
||||
recycler: &BlobRecycler,
|
||||
consumed: u64,
|
||||
received: u64,
|
||||
) -> Vec<(SocketAddr, Vec<u8>)> {
|
||||
) -> Vec<u64> {
|
||||
(consumed..received)
|
||||
.filter_map(|pix| {
|
||||
let i = (pix % WINDOW_SIZE) as usize;
|
||||
let ref mut slot = &mut window[i];
|
||||
|
||||
if let Some(blob) = mem::replace(&mut slot.data, None) {
|
||||
let blob_idx = blob.read().unwrap().get_index().unwrap();
|
||||
if let Some(blob_idx) = window[i].blob_index() {
|
||||
if blob_idx == pix {
|
||||
mem::replace(&mut slot.data, Some(blob));
|
||||
} else {
|
||||
recycler.recycle(blob, "find_next_missing");
|
||||
return None;
|
||||
}
|
||||
}
|
||||
|
||||
if slot.data.is_some() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let val = crdt.read().unwrap().window_index_request(pix);
|
||||
val.ok()
|
||||
window[i].clear_data(recycler);
|
||||
Some(pix)
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
@ -134,7 +139,12 @@ fn repair_window(
|
||||
}
|
||||
|
||||
let mut window = window.write().unwrap();
|
||||
let reqs = find_next_missing(&mut window, crdt, recycler, consumed, highest_lost);
|
||||
let idxs = clear_window_slots(&mut window, recycler, consumed, highest_lost);
|
||||
let reqs: Vec<_> = idxs
|
||||
.into_iter()
|
||||
.filter_map(|pix| crdt.read().unwrap().window_index_request(pix).ok())
|
||||
.collect();
|
||||
|
||||
inc_new_counter_info!("streamer-repair_window-repair", reqs.len());
|
||||
if log_enabled!(Level::Trace) {
|
||||
trace!(
|
||||
@ -146,7 +156,7 @@ fn repair_window(
|
||||
reqs.len()
|
||||
);
|
||||
|
||||
for (to, _) in reqs.clone() {
|
||||
for (to, _) in &reqs.clone() {
|
||||
trace!("{}: repair_window request to {}", id, to);
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user