From 7fd7310b96167767b90e30a69cad69b200e2fc4f Mon Sep 17 00:00:00 2001 From: pgarg66 Date: Fri, 3 Aug 2018 20:15:14 -0700 Subject: [PATCH] Prevent a node from overrunning it's receive window (#846) - The node drops blobs that will cause it to overrun window - The node does not ask to repair a blob that overruns the window --- src/streamer.rs | 30 ++++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/src/streamer.rs b/src/streamer.rs index f81df4b677..3cb8026abc 100755 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -211,7 +211,12 @@ fn calculate_highest_lost_blob_index(num_peers: u64, consumed: u64, received: u6 // via avalanche. The avalanche splits data stream into nodes and each node retransmits // the data to their peer nodes. So there's a possibility that a blob (with index lower // than current received index) is being retransmitted by a peer node. - cmp::max(consumed, received.saturating_sub(num_peers)) + let highest_lost = cmp::max(consumed, received.saturating_sub(num_peers)); + + // This check prevents repairing a blob that will cause window to roll over. Even if + // the highes_lost blob is actually missing, asking to repair it might cause our + // current window to move past other missing blobs + cmp::min(consumed + WINDOW_SIZE - 1, highest_lost) } fn repair_window( @@ -484,6 +489,11 @@ fn recv_window( let p = b.write().expect("'b' write lock in fn recv_window"); (p.get_index()?, p.meta.size) }; + // Prevent receive window from running over + if pix >= *consumed + WINDOW_SIZE { + recycler.recycle(b); + continue; + } if pix > *received { *received = pix; } @@ -921,7 +931,7 @@ mod test { use std::time::Duration; use streamer::calculate_highest_lost_blob_index; use streamer::{blob_receiver, receiver, responder, window}; - use streamer::{default_window, BlobReceiver, PacketReceiver}; + use streamer::{default_window, BlobReceiver, PacketReceiver, WINDOW_SIZE}; fn get_msgs(r: PacketReceiver, num: &mut usize) { for _t in 0..5 { @@ -1080,5 +1090,21 @@ mod test { assert_eq!(calculate_highest_lost_blob_index(90, 10, 50), 10); assert_eq!(calculate_highest_lost_blob_index(90, 10, 99), 10); assert_eq!(calculate_highest_lost_blob_index(90, 10, 101), 11); + assert_eq!( + calculate_highest_lost_blob_index(90, 10, 95 + WINDOW_SIZE), + WINDOW_SIZE + 5 + ); + assert_eq!( + calculate_highest_lost_blob_index(90, 10, 99 + WINDOW_SIZE), + WINDOW_SIZE + 9 + ); + assert_eq!( + calculate_highest_lost_blob_index(90, 10, 100 + WINDOW_SIZE), + WINDOW_SIZE + 9 + ); + assert_eq!( + calculate_highest_lost_blob_index(90, 10, 120 + WINDOW_SIZE), + WINDOW_SIZE + 9 + ); } }