diff --git a/src/erasure.rs b/src/erasure.rs index 6cb9090e20..d58d0dcdde 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -303,15 +303,13 @@ pub fn recover( start: usize, num_blobs: usize, ) -> Result<()> { - let num_blocks = num_blobs / NUM_DATA; + let num_blocks = (num_blobs / NUM_DATA) + 1; let mut block_start = start - (start % NUM_DATA); - if num_blocks > 0 { - debug!( - "num_blocks: {} start: {} num_blobs: {} block_start: {}", - num_blocks, start, num_blobs, block_start - ); - } + debug!( + "num_blocks: {} start: {} num_blobs: {} block_start: {}", + num_blocks, start, num_blobs, block_start + ); for _ in 0..num_blocks { let mut data_missing = 0; diff --git a/src/streamer.rs b/src/streamer.rs index b975a2422d..ddf49a7adc 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -298,6 +298,40 @@ fn retransmit_all_leader_blocks( Ok(()) } +/// make space in window for newly received blobs that come after +/// consumed, before received, clear any old ones +fn reset_slots( + window: &mut [WindowSlot], + recycler: &BlobRecycler, + consumed: u64, + received: u64, + debug_id: u64, +) { + for ix in consumed..received { + let k = (ix % WINDOW_SIZE) as usize; + + let mut old = false; + if let Some(b) = &window[k].data { + old = b.read().unwrap().get_index().unwrap() < consumed; + } + if old { + if let Some(b) = mem::replace(&mut window[k].data, None) { + debug!("{:x}: recycling data blob at index {:}", debug_id, k); + recycler.recycle(b); + } + } + if let Some(b) = &window[k].coding { + old = b.read().unwrap().get_index().unwrap() < consumed; + } + if old { + if let Some(b) = mem::replace(&mut window[k].coding, None) { + debug!("{:x}: recycling coding blob at index {:}", debug_id, k); + recycler.recycle(b); + } + } + } +} + /// process a blob: Add blob to the window. If a continuous set of blobs /// starting from consumed is thereby formed, add that continuous /// range of blobs to a queue to be sent on to the next stage. @@ -326,30 +360,11 @@ fn process_blob( ) { let mut window = window.write().unwrap(); - // Search the window for old blobs in the window - // of consumed to received and clear any old ones - for ix in *consumed..(received + 1) { - let k = (ix % WINDOW_SIZE) as usize; - - let mut old = false; - if let Some(b) = &window[k].data { - old = b.read().unwrap().get_index().unwrap() < *consumed; - } - if old { - if let Some(b) = mem::replace(&mut window[k].data, None) { - debug!("{:x}: recycling data blob at index {:}", debug_id, k); - recycler.recycle(b); - } - } - if let Some(b) = &window[k].coding { - old = b.read().unwrap().get_index().unwrap() < *consumed; - } - if old { - if let Some(b) = mem::replace(&mut window[k].coding, None) { - debug!("{:x}: recycling coding blob at index {:}", debug_id, k); - recycler.recycle(b); - } - } + if pix == received { + // When pix == received, we've *just* updated received, which means + // possibly new slots between consumed and received have been exposed, + // so clean up old blobs between consumed and received + reset_slots(&mut window, recycler, *consumed, received, debug_id); } let is_coding = { @@ -405,17 +420,20 @@ fn process_blob( // } // push all contiguous blobs into consumed queue, increment consumed - while *consumed < received { + loop { let k = (*consumed % WINDOW_SIZE) as usize; - trace!("k: {} consumed: {}", k, *consumed); + trace!("k: {} consumed: {} received: {}", k, *consumed, received); - if window[k].data.is_none() { + if let Some(blob) = &window[k].data { + if blob.read().unwrap().get_index().unwrap() < *consumed { + // window wrap-around, end of received + break; + } + } else { + // window[k].data is None, end of received break; } - if let Some(blob) = &window[w].data { - assert!(blob.read().unwrap().meta.size < BLOB_SIZE); - } consume_queue.push_back(window[k].data.clone().expect("clone in fn recv_window")); *consumed += 1; }