Prevent a node from overrunning it's receive window (#846)
- The node drops blobs that will cause it to overrun window - The node does not ask to repair a blob that overruns the window
This commit is contained in:
@ -211,7 +211,12 @@ fn calculate_highest_lost_blob_index(num_peers: u64, consumed: u64, received: u6
|
|||||||
// via avalanche. The avalanche splits data stream into nodes and each node retransmits
|
// via avalanche. The avalanche splits data stream into nodes and each node retransmits
|
||||||
// the data to their peer nodes. So there's a possibility that a blob (with index lower
|
// the data to their peer nodes. So there's a possibility that a blob (with index lower
|
||||||
// than current received index) is being retransmitted by a peer node.
|
// than current received index) is being retransmitted by a peer node.
|
||||||
cmp::max(consumed, received.saturating_sub(num_peers))
|
let highest_lost = cmp::max(consumed, received.saturating_sub(num_peers));
|
||||||
|
|
||||||
|
// This check prevents repairing a blob that will cause window to roll over. Even if
|
||||||
|
// the highes_lost blob is actually missing, asking to repair it might cause our
|
||||||
|
// current window to move past other missing blobs
|
||||||
|
cmp::min(consumed + WINDOW_SIZE - 1, highest_lost)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn repair_window(
|
fn repair_window(
|
||||||
@ -484,6 +489,11 @@ fn recv_window(
|
|||||||
let p = b.write().expect("'b' write lock in fn recv_window");
|
let p = b.write().expect("'b' write lock in fn recv_window");
|
||||||
(p.get_index()?, p.meta.size)
|
(p.get_index()?, p.meta.size)
|
||||||
};
|
};
|
||||||
|
// Prevent receive window from running over
|
||||||
|
if pix >= *consumed + WINDOW_SIZE {
|
||||||
|
recycler.recycle(b);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
if pix > *received {
|
if pix > *received {
|
||||||
*received = pix;
|
*received = pix;
|
||||||
}
|
}
|
||||||
@ -921,7 +931,7 @@ mod test {
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use streamer::calculate_highest_lost_blob_index;
|
use streamer::calculate_highest_lost_blob_index;
|
||||||
use streamer::{blob_receiver, receiver, responder, window};
|
use streamer::{blob_receiver, receiver, responder, window};
|
||||||
use streamer::{default_window, BlobReceiver, PacketReceiver};
|
use streamer::{default_window, BlobReceiver, PacketReceiver, WINDOW_SIZE};
|
||||||
|
|
||||||
fn get_msgs(r: PacketReceiver, num: &mut usize) {
|
fn get_msgs(r: PacketReceiver, num: &mut usize) {
|
||||||
for _t in 0..5 {
|
for _t in 0..5 {
|
||||||
@ -1080,5 +1090,21 @@ mod test {
|
|||||||
assert_eq!(calculate_highest_lost_blob_index(90, 10, 50), 10);
|
assert_eq!(calculate_highest_lost_blob_index(90, 10, 50), 10);
|
||||||
assert_eq!(calculate_highest_lost_blob_index(90, 10, 99), 10);
|
assert_eq!(calculate_highest_lost_blob_index(90, 10, 99), 10);
|
||||||
assert_eq!(calculate_highest_lost_blob_index(90, 10, 101), 11);
|
assert_eq!(calculate_highest_lost_blob_index(90, 10, 101), 11);
|
||||||
|
assert_eq!(
|
||||||
|
calculate_highest_lost_blob_index(90, 10, 95 + WINDOW_SIZE),
|
||||||
|
WINDOW_SIZE + 5
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
calculate_highest_lost_blob_index(90, 10, 99 + WINDOW_SIZE),
|
||||||
|
WINDOW_SIZE + 9
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
calculate_highest_lost_blob_index(90, 10, 100 + WINDOW_SIZE),
|
||||||
|
WINDOW_SIZE + 9
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
calculate_highest_lost_blob_index(90, 10, 120 + WINDOW_SIZE),
|
||||||
|
WINDOW_SIZE + 9
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user