diff --git a/src/streamer.rs b/src/streamer.rs index 2f10de43f4..c76b6b1348 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -294,17 +294,10 @@ fn recv_window( trace!("window w: {} size: {}", w, meta_size); { let mut window = locked_window.write().unwrap(); - if window[w].is_none() { - window[w] = Some(b); - } else if let Some(cblob) = &window[w] { - if cblob.read().unwrap().get_index().unwrap() != pix as u64 { - warn!("overrun blob at index {:}", w); - } else { - debug!("duplicate blob at index {:}", w); - } - } - // recycle old references - for ix in *consumed..pix { + + // Search the window for old blobs in the window + // of consumed to received and clear any old ones + for ix in *consumed..(pix + 1) { let k = (ix % WINDOW_SIZE) as usize; if let Some(b) = &mut window[k] { if b.read().unwrap().get_index().unwrap() >= *consumed as u64 { @@ -315,6 +308,18 @@ fn recv_window( recycler.recycle(b); } } + + // Insert the new blob into the window + // spot should be free because we cleared it above + if window[w].is_none() { + window[w] = Some(b); + } else if let Some(cblob) = &window[w] { + if cblob.read().unwrap().get_index().unwrap() != pix as u64 { + warn!("overrun blob at index {:}", w); + } else { + debug!("duplicate blob at index {:}", w); + } + } loop { let k = (*consumed % WINDOW_SIZE) as usize; trace!("k: {} consumed: {}", k, *consumed); @@ -324,11 +329,13 @@ fn recv_window( } let mut is_coding = false; if let &Some(ref cblob) = &window[k] { - if cblob + let cblob_r = cblob .read() - .expect("blob read lock for flags streamer::window") - .is_coding() - { + .expect("blob read lock for flogs streamer::window"); + if cblob_r.get_index().unwrap() < *consumed { + break; + } + if cblob_r.is_coding() { is_coding = true; } }