diff --git a/src/streamer.rs b/src/streamer.rs index 82da7a6c7c..f2fbcef4c0 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -159,11 +159,23 @@ fn find_next_missing( fn repair_window( locked_window: &Arc>>>, crdt: &Arc>, + last: &mut usize, + times: &mut usize, consumed: &mut usize, received: &mut usize, ) -> Result<()> { let reqs = find_next_missing(locked_window, crdt, consumed, received)?; - info!("repair_window {} {}", *consumed, *received); + //exponential backoff + if *last != *consumed { + *times = 0; + } + *last = *consumed; + *times += 1; + //if times flips from all 1s 7 -> 8, 15 -> 16, we retry otherwise return Ok + if *times & (*times - 1) != 0 { + return Ok(()); + } + info!("repair_window request {} {}", *consumed, *received); let sock = UdpSocket::bind("0.0.0.0:0")?; for (to, req) in reqs { //todo cache socket @@ -307,6 +319,8 @@ pub fn window( spawn(move || { let mut consumed = 0; let mut received = 0; + let mut last = 0; + let mut times = 0; loop { if exit.load(Ordering::Relaxed) { break; @@ -321,7 +335,14 @@ pub fn window( &s, &retransmit, ); - let _ = repair_window(&window, &crdt, &mut consumed, &mut received); + let _ = repair_window( + &window, + &crdt, + &mut last, + &mut times, + &mut consumed, + &mut received, + ); } }) }