diff --git a/src/streamer.rs b/src/streamer.rs old mode 100644 new mode 100755 index 62ff548f9d..095a965188 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -205,6 +205,14 @@ fn find_next_missing( Ok(reqs) } +fn calculate_highest_lost_blob_index(num_peers: u64, consumed: u64, received: u64) -> u64 { + // Calculate the highest blob index that this node should have already received + // via avalanche. The avalanche splits data stream into nodes and each node retransmits + // the data to their peer nodes. So there's a possibility that a blob (with index lower + // than current received index) is being retransmitted by a peer node. + cmp::max(consumed, received.saturating_sub(num_peers)) +} + fn repair_window( debug_id: u64, window: &SharedWindow, @@ -227,16 +235,21 @@ fn repair_window( return Ok(()); } - let reqs = find_next_missing(window, crdt, recycler, consumed, received)?; + let highest_lost = calculate_highest_lost_blob_index( + crdt.read().unwrap().table.len() as u64, + consumed, + received, + ); + let reqs = find_next_missing(window, crdt, recycler, consumed, highest_lost)?; trace!("{:x}: repair_window missing: {}", debug_id, reqs.len()); if !reqs.is_empty() { inc_new_counter!("streamer-repair_window-repair", reqs.len()); debug!( - "{:x}: repair_window counter times: {} consumed: {} received: {} missing: {}", + "{:x}: repair_window counter times: {} consumed: {} highest_lost: {} missing: {}", debug_id, *times, consumed, - received, + highest_lost, reqs.len() ); } @@ -245,7 +258,7 @@ fn repair_window( //todo cache socket debug!( "{:x}: repair_window request {} {} {}", - debug_id, consumed, received, to + debug_id, consumed, highest_lost, to ); assert!(req.len() <= BLOB_SIZE); sock.send_to(&req, to)?; @@ -902,6 +915,7 @@ mod test { use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::time::Duration; + use streamer::calculate_highest_lost_blob_index; use streamer::{blob_receiver, receiver, responder, window}; use streamer::{default_window, BlobReceiver, PacketReceiver}; @@ -1053,4 +1067,14 @@ mod test { t_responder.join().expect("join"); t_window.join().expect("join"); } + + #[test] + pub fn calculate_highest_lost_blob_index_test() { + assert_eq!(calculate_highest_lost_blob_index(0, 10, 90), 90); + assert_eq!(calculate_highest_lost_blob_index(15, 10, 90), 75); + assert_eq!(calculate_highest_lost_blob_index(90, 10, 90), 10); + assert_eq!(calculate_highest_lost_blob_index(90, 10, 50), 10); + assert_eq!(calculate_highest_lost_blob_index(90, 10, 99), 10); + assert_eq!(calculate_highest_lost_blob_index(90, 10, 101), 11); + } }