diff --git a/src/streamer.rs b/src/streamer.rs old mode 100755 new mode 100644 index 3cb8026abc..1ad6bf2ecd --- a/src/streamer.rs +++ b/src/streamer.rs @@ -13,6 +13,7 @@ use std::cmp; use std::collections::VecDeque; use std::mem; use std::net::{SocketAddr, UdpSocket}; +use std::result; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender}; use std::sync::{Arc, RwLock}; @@ -442,6 +443,42 @@ fn process_blob( } } +#[derive(Debug, PartialEq, Eq)] +enum RecvWindowError { + WindowOverrun, + AlreadyReceived, +} + +fn validate_blob_against_window( + debug_id: u64, + pix: u64, + consumed: u64, + received: u64, +) -> result::Result { + // Prevent receive window from running over + if pix >= consumed + WINDOW_SIZE { + debug!( + "{:x}: received: {} will overrun window: {} skipping..", + debug_id, + pix, + consumed + WINDOW_SIZE + ); + return Err(RecvWindowError::WindowOverrun); + } + + // Got a blob which has already been consumed, skip it + // probably from a repair window request + if pix < consumed { + debug!( + "{:x}: received: {} but older than consumed: {} skipping..", + debug_id, pix, consumed + ); + return Err(RecvWindowError::AlreadyReceived); + } + + Ok(cmp::max(pix, received)) +} + fn recv_window( debug_id: u64, window: &SharedWindow, @@ -489,23 +526,14 @@ fn recv_window( let p = b.write().expect("'b' write lock in fn recv_window"); (p.get_index()?, p.meta.size) }; - // Prevent receive window from running over - if pix >= *consumed + WINDOW_SIZE { - recycler.recycle(b); - continue; - } - if pix > *received { - *received = pix; - } - // Got a blob which has already been consumed, skip it - // probably from a repair window request - if pix < *consumed { - debug!( - "{:x}: received: {} but older than consumed: {} skipping..", - debug_id, pix, *consumed - ); - recycler.recycle(b); - continue; + + let result = validate_blob_against_window(debug_id, pix, *consumed, *received); + match result { + Ok(v) => *received = v, + Err(_e) => { + recycler.recycle(b); + continue; + } } trace!("{:x} window pix: {} size: {}", debug_id, pix, meta_size); @@ -930,6 +958,8 @@ mod test { use std::sync::{Arc, RwLock}; use std::time::Duration; use streamer::calculate_highest_lost_blob_index; + use streamer::validate_blob_against_window; + use streamer::RecvWindowError; use streamer::{blob_receiver, receiver, responder, window}; use streamer::{default_window, BlobReceiver, PacketReceiver, WINDOW_SIZE}; @@ -1107,4 +1137,28 @@ mod test { WINDOW_SIZE + 9 ); } + + #[test] + pub fn validate_blob_against_window_test() { + assert_eq!( + validate_blob_against_window(0, 90 + WINDOW_SIZE, 90, 100).unwrap_err(), + RecvWindowError::WindowOverrun + ); + assert_eq!( + validate_blob_against_window(0, 91 + WINDOW_SIZE, 90, 100).unwrap_err(), + RecvWindowError::WindowOverrun + ); + assert_eq!( + validate_blob_against_window(0, 89, 90, 100).unwrap_err(), + RecvWindowError::AlreadyReceived + ); + assert_eq!( + validate_blob_against_window(0, 91, 90, 100).ok().unwrap(), + 100 + ); + assert_eq!( + validate_blob_against_window(0, 101, 90, 100).ok().unwrap(), + 101 + ); + } }