Remove exit variable from WindowStage and retransmit [stage]

This commit is contained in:
Greg Fitzgerald
2018-07-05 16:37:04 -06:00
committed by Greg Fitzgerald
parent 46602ba9c3
commit f284af1c3d
3 changed files with 15 additions and 18 deletions

View File

@ -464,7 +464,6 @@ pub fn default_window() -> Window {
} }
pub fn window( pub fn window(
exit: Arc<AtomicBool>,
crdt: Arc<RwLock<Crdt>>, crdt: Arc<RwLock<Crdt>>,
window: Window, window: Window,
entry_height: u64, entry_height: u64,
@ -482,10 +481,7 @@ pub fn window(
let mut times = 0; let mut times = 0;
let debug_id = crdt.read().unwrap().debug_id(); let debug_id = crdt.read().unwrap().debug_id();
loop { loop {
if exit.load(Ordering::Relaxed) { if let Err(e) = recv_window(
break;
}
let _ = recv_window(
debug_id, debug_id,
&window, &window,
&crdt, &crdt,
@ -495,7 +491,13 @@ pub fn window(
&r, &r,
&s, &s,
&retransmit, &retransmit,
); ) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => error!("{:?}", e),
}
}
let _ = repair_window( let _ = repair_window(
debug_id, debug_id,
&window, &window,
@ -669,7 +671,6 @@ fn retransmit(
/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes. /// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
pub fn retransmitter( pub fn retransmitter(
sock: UdpSocket, sock: UdpSocket,
exit: Arc<AtomicBool>,
crdt: Arc<RwLock<Crdt>>, crdt: Arc<RwLock<Crdt>>,
recycler: BlobRecycler, recycler: BlobRecycler,
r: BlobReceiver, r: BlobReceiver,
@ -679,11 +680,13 @@ pub fn retransmitter(
.spawn(move || { .spawn(move || {
trace!("retransmitter started"); trace!("retransmitter started");
loop { loop {
if exit.load(Ordering::Relaxed) { if let Err(e) = retransmit(&crdt, &recycler, &r, &sock) {
break; match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => error!("{:?}", e),
}
} }
// TODO: handle this error
let _ = retransmit(&crdt, &recycler, &r, &sock);
} }
trace!("exiting retransmitter"); trace!("exiting retransmitter");
}) })
@ -903,7 +906,6 @@ mod test {
let (s_retransmit, r_retransmit) = channel(); let (s_retransmit, r_retransmit) = channel();
let win = default_window(); let win = default_window();
let t_window = window( let t_window = window(
exit.clone(),
subs, subs,
win, win,
0, 0,

View File

@ -78,7 +78,7 @@ impl Tvu {
let blob_recycler = BlobRecycler::default(); let blob_recycler = BlobRecycler::default();
let (fetch_stage, blob_receiver) = BlobFetchStage::new_multi_socket( let (fetch_stage, blob_receiver) = BlobFetchStage::new_multi_socket(
vec![replicate_socket, repair_socket], vec![replicate_socket, repair_socket],
exit.clone(), exit,
blob_recycler.clone(), blob_recycler.clone(),
); );
//TODO //TODO
@ -89,7 +89,6 @@ impl Tvu {
window, window,
entry_height, entry_height,
retransmit_socket, retransmit_socket,
exit.clone(),
blob_recycler.clone(), blob_recycler.clone(),
blob_receiver, blob_receiver,
); );

View File

@ -4,7 +4,6 @@ use crdt::Crdt;
use packet::BlobRecycler; use packet::BlobRecycler;
use service::Service; use service::Service;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle}; use std::thread::{self, JoinHandle};
@ -20,7 +19,6 @@ impl WindowStage {
window: Window, window: Window,
entry_height: u64, entry_height: u64,
retransmit_socket: UdpSocket, retransmit_socket: UdpSocket,
exit: Arc<AtomicBool>,
blob_recycler: BlobRecycler, blob_recycler: BlobRecycler,
fetch_stage_receiver: BlobReceiver, fetch_stage_receiver: BlobReceiver,
) -> (Self, BlobReceiver) { ) -> (Self, BlobReceiver) {
@ -28,14 +26,12 @@ impl WindowStage {
let t_retransmit = streamer::retransmitter( let t_retransmit = streamer::retransmitter(
retransmit_socket, retransmit_socket,
exit.clone(),
crdt.clone(), crdt.clone(),
blob_recycler.clone(), blob_recycler.clone(),
retransmit_receiver, retransmit_receiver,
); );
let (blob_sender, blob_receiver) = channel(); let (blob_sender, blob_receiver) = channel();
let t_window = streamer::window( let t_window = streamer::window(
exit.clone(),
crdt.clone(), crdt.clone(),
window, window,
entry_height, entry_height,