Remove exit variable from broadcast [stage]

This commit is contained in:
Greg Fitzgerald
2018-07-05 15:50:42 -06:00
committed by Greg Fitzgerald
parent cbd664ba4b
commit d1c317fd5f
2 changed files with 12 additions and 10 deletions

View File

@ -178,7 +178,6 @@ impl FullNode {
let t_broadcast = streamer::broadcaster( let t_broadcast = streamer::broadcaster(
node.sockets.broadcast, node.sockets.broadcast,
exit.clone(),
crdt, crdt,
window, window,
entry_height, entry_height,

View File

@ -1,17 +1,17 @@
//! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets. //! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets.
//! //!
use crdt::{Crdt, ReplicatedData}; use crdt::{Crdt, CrdtError, ReplicatedData};
#[cfg(feature = "erasure")] #[cfg(feature = "erasure")]
use erasure; use erasure;
use packet::{ use packet::{
Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedBlobs, SharedPackets, BLOB_SIZE, Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedBlobs, SharedPackets, BLOB_SIZE,
}; };
use result::Result; use result::{Error, Result};
use std::collections::VecDeque; use std::collections::VecDeque;
use std::mem; use std::mem;
use std::net::{SocketAddr, UdpSocket}; use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, Sender}; use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::{Builder, JoinHandle}; use std::thread::{Builder, JoinHandle};
use std::time::Duration; use std::time::Duration;
@ -602,7 +602,6 @@ fn broadcast(
/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes. /// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
pub fn broadcaster( pub fn broadcaster(
sock: UdpSocket, sock: UdpSocket,
exit: Arc<AtomicBool>,
crdt: Arc<RwLock<Crdt>>, crdt: Arc<RwLock<Crdt>>,
window: Window, window: Window,
entry_height: u64, entry_height: u64,
@ -616,10 +615,7 @@ pub fn broadcaster(
let mut receive_index = entry_height; let mut receive_index = entry_height;
let debug_id = crdt.read().unwrap().debug_id(); let debug_id = crdt.read().unwrap().debug_id();
loop { loop {
if exit.load(Ordering::Relaxed) { if let Err(e) = broadcast(
break;
}
let _ = broadcast(
debug_id, debug_id,
&crdt, &crdt,
&window, &window,
@ -628,7 +624,14 @@ pub fn broadcaster(
&sock, &sock,
&mut transmit_index, &mut transmit_index,
&mut receive_index, &mut receive_index,
); ) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
Error::CrdtError(CrdtError::TooSmall) => (), // TODO: Why are the unit-tests throwing hundreds of these?
_ => error!("{:?}", e),
}
}
} }
}) })
.unwrap() .unwrap()