From f284af1c3d7776aabab80d8c8237336d3c3d1d62 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Thu, 5 Jul 2018 16:37:04 -0600 Subject: [PATCH] Remove exit variable from WindowStage and retransmit [stage] --- src/streamer.rs | 26 ++++++++++++++------------ src/tvu.rs | 3 +-- src/window_stage.rs | 4 ---- 3 files changed, 15 insertions(+), 18 deletions(-) diff --git a/src/streamer.rs b/src/streamer.rs index 27e0488697..7bb5f76447 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -464,7 +464,6 @@ pub fn default_window() -> Window { } pub fn window( - exit: Arc, crdt: Arc>, window: Window, entry_height: u64, @@ -482,10 +481,7 @@ pub fn window( let mut times = 0; let debug_id = crdt.read().unwrap().debug_id(); loop { - if exit.load(Ordering::Relaxed) { - break; - } - let _ = recv_window( + if let Err(e) = recv_window( debug_id, &window, &crdt, @@ -495,7 +491,13 @@ pub fn window( &r, &s, &retransmit, - ); + ) { + match e { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + _ => error!("{:?}", e), + } + } let _ = repair_window( debug_id, &window, @@ -669,7 +671,6 @@ fn retransmit( /// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes. pub fn retransmitter( sock: UdpSocket, - exit: Arc, crdt: Arc>, recycler: BlobRecycler, r: BlobReceiver, @@ -679,11 +680,13 @@ pub fn retransmitter( .spawn(move || { trace!("retransmitter started"); loop { - if exit.load(Ordering::Relaxed) { - break; + if let Err(e) = retransmit(&crdt, &recycler, &r, &sock) { + match e { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + _ => error!("{:?}", e), + } } - // TODO: handle this error - let _ = retransmit(&crdt, &recycler, &r, &sock); } trace!("exiting retransmitter"); }) @@ -903,7 +906,6 @@ mod test { let (s_retransmit, r_retransmit) = channel(); let win = default_window(); let t_window = window( - exit.clone(), subs, win, 0, diff --git a/src/tvu.rs b/src/tvu.rs index 77330a1fef..feeddd685d 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -78,7 +78,7 @@ impl Tvu { let blob_recycler = BlobRecycler::default(); let (fetch_stage, blob_receiver) = BlobFetchStage::new_multi_socket( vec![replicate_socket, repair_socket], - exit.clone(), + exit, blob_recycler.clone(), ); //TODO @@ -89,7 +89,6 @@ impl Tvu { window, entry_height, retransmit_socket, - exit.clone(), blob_recycler.clone(), blob_receiver, ); diff --git a/src/window_stage.rs b/src/window_stage.rs index d5eaa6a278..74a0853de2 100644 --- a/src/window_stage.rs +++ b/src/window_stage.rs @@ -4,7 +4,6 @@ use crdt::Crdt; use packet::BlobRecycler; use service::Service; use std::net::UdpSocket; -use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::thread::{self, JoinHandle}; @@ -20,7 +19,6 @@ impl WindowStage { window: Window, entry_height: u64, retransmit_socket: UdpSocket, - exit: Arc, blob_recycler: BlobRecycler, fetch_stage_receiver: BlobReceiver, ) -> (Self, BlobReceiver) { @@ -28,14 +26,12 @@ impl WindowStage { let t_retransmit = streamer::retransmitter( retransmit_socket, - exit.clone(), crdt.clone(), blob_recycler.clone(), retransmit_receiver, ); let (blob_sender, blob_receiver) = channel(); let t_window = streamer::window( - exit.clone(), crdt.clone(), window, entry_height,