From 8effa4e3e02ddf1a64cb7823746fbd7ae69ab54d Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Wed, 27 Jun 2018 21:11:16 -0700 Subject: [PATCH] Clear old blobs before putting in the new one Otherwise we will just warn about overrun and not insert new blob Also, break if the index we find is less than consumed otherwise we can infinite loop --- src/streamer.rs | 37 ++++++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/src/streamer.rs b/src/streamer.rs index 2f10de43f4..c76b6b1348 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -294,17 +294,10 @@ fn recv_window( trace!("window w: {} size: {}", w, meta_size); { let mut window = locked_window.write().unwrap(); - if window[w].is_none() { - window[w] = Some(b); - } else if let Some(cblob) = &window[w] { - if cblob.read().unwrap().get_index().unwrap() != pix as u64 { - warn!("overrun blob at index {:}", w); - } else { - debug!("duplicate blob at index {:}", w); - } - } - // recycle old references - for ix in *consumed..pix { + + // Search the window for old blobs in the window + // of consumed to received and clear any old ones + for ix in *consumed..(pix + 1) { let k = (ix % WINDOW_SIZE) as usize; if let Some(b) = &mut window[k] { if b.read().unwrap().get_index().unwrap() >= *consumed as u64 { @@ -315,6 +308,18 @@ fn recv_window( recycler.recycle(b); } } + + // Insert the new blob into the window + // spot should be free because we cleared it above + if window[w].is_none() { + window[w] = Some(b); + } else if let Some(cblob) = &window[w] { + if cblob.read().unwrap().get_index().unwrap() != pix as u64 { + warn!("overrun blob at index {:}", w); + } else { + debug!("duplicate blob at index {:}", w); + } + } loop { let k = (*consumed % WINDOW_SIZE) as usize; trace!("k: {} consumed: {}", k, *consumed); @@ -324,11 +329,13 @@ fn recv_window( } let mut is_coding = false; if let &Some(ref cblob) = &window[k] { - if cblob + let cblob_r = cblob .read() - .expect("blob read lock for flags streamer::window") - .is_coding() - { + .expect("blob read lock for flogs streamer::window"); + if cblob_r.get_index().unwrap() < *consumed { + break; + } + if cblob_r.is_coding() { is_coding = true; } }