diff --git a/src/fullnode.rs b/src/fullnode.rs index b901473784..080745a6cf 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -178,7 +178,6 @@ impl FullNode { let t_broadcast = streamer::broadcaster( node.sockets.broadcast, - exit.clone(), crdt, window, entry_height, diff --git a/src/streamer.rs b/src/streamer.rs index 9e215a7737..27e0488697 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -1,17 +1,17 @@ //! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets. //! -use crdt::{Crdt, ReplicatedData}; +use crdt::{Crdt, CrdtError, ReplicatedData}; #[cfg(feature = "erasure")] use erasure; use packet::{ Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedBlobs, SharedPackets, BLOB_SIZE, }; -use result::Result; +use result::{Error, Result}; use std::collections::VecDeque; use std::mem; use std::net::{SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{Receiver, Sender}; +use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender}; use std::sync::{Arc, RwLock}; use std::thread::{Builder, JoinHandle}; use std::time::Duration; @@ -602,7 +602,6 @@ fn broadcast( /// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes. pub fn broadcaster( sock: UdpSocket, - exit: Arc, crdt: Arc>, window: Window, entry_height: u64, @@ -616,10 +615,7 @@ pub fn broadcaster( let mut receive_index = entry_height; let debug_id = crdt.read().unwrap().debug_id(); loop { - if exit.load(Ordering::Relaxed) { - break; - } - let _ = broadcast( + if let Err(e) = broadcast( debug_id, &crdt, &window, @@ -628,7 +624,14 @@ pub fn broadcaster( &sock, &mut transmit_index, &mut receive_index, - ); + ) { + match e { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + Error::CrdtError(CrdtError::TooSmall) => (), // TODO: Why are the unit-tests throwing hundreds of these? + _ => error!("{:?}", e), + } + } } }) .unwrap()