From f511ac9be7e15869bc5304eb6da79cb40c3e4a5b Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Tue, 5 Jun 2018 12:21:29 -0700 Subject: [PATCH] Fixes for receiving old blobs and nulling the window with coding --- src/erasure.rs | 9 +++++---- src/streamer.rs | 19 +++++++++++++++++-- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/src/erasure.rs b/src/erasure.rs index 499e7e1e91..88296ded18 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -4,8 +4,8 @@ use packet::{BlobRecycler, SharedBlob, BLOB_HEADER_SIZE}; use std::result; //TODO(sakridge) pick these values -const NUM_CODED: usize = 20; -const MAX_MISSING: usize = 4; +pub const NUM_CODED: usize = 20; +pub const MAX_MISSING: usize = 4; const NUM_DATA: usize = NUM_CODED - MAX_MISSING; #[derive(Debug, PartialEq, Eq)] @@ -315,11 +315,12 @@ pub fn recover( } } } - if (data_missing + coded_missing) != NUM_CODED { - trace!("recovering: data: {} coding: {}", data_missing, coded_missing); + if (data_missing + coded_missing) != NUM_CODED && (data_missing + coded_missing) != 0 { + debug!("1: start: {} recovering: data: {} coding: {}", block_start, data_missing, coded_missing); } if data_missing > 0 { if (data_missing + coded_missing) <= MAX_MISSING { + debug!("2: recovering: data: {} coding: {}", data_missing, coded_missing); let mut blobs: Vec = Vec::new(); let mut locks = Vec::new(); let mut erasures: Vec = Vec::new(); diff --git a/src/streamer.rs b/src/streamer.rs index 5a9779ee12..e96a29cafe 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -268,6 +268,12 @@ fn recv_window( if pix > *received { *received = pix; } + // Got a blob which has already been consumed, skip it + // probably from a repair window request + if pix < *consumed { + info!("received: {} but older than consumed: {} skipping..", pix, *consumed); + continue; + } let w = pix % WINDOW_SIZE; //TODO, after the block are authenticated //if we get different blocks at the same index @@ -299,9 +305,18 @@ fn recv_window( } if !is_coding { contq.push_back(window[k].clone().expect("clone in fn recv_window")); + *consumed += 1; + } else { + let block_start = *consumed - (*consumed % erasure::NUM_CODED); + let coding_end = block_start + erasure::NUM_CODED; + // We've received all this block's data blobs, go and null out the window now + for j in block_start..coding_end { + window[j % WINDOW_SIZE] = None; + } + + *consumed += erasure::MAX_MISSING; + info!("skipping processing coding blob k: {} consumed: {}", k, *consumed); } - window[k] = None; - *consumed += 1; } } }