diff --git a/src/streamer.rs b/src/streamer.rs index 3917ce84f1..1f8d3b0128 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -10,7 +10,7 @@ use std::net::{SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc; use std::sync::{Arc, RwLock}; -use std::thread::{spawn, JoinHandle}; +use std::thread::{Builder, JoinHandle}; use std::time::Duration; pub const WINDOW_SIZE: usize = 2 * 1024; @@ -58,10 +58,13 @@ pub fn receiver( if res.is_err() { panic!("streamer::receiver set_read_timeout error"); } - spawn(move || { - let _ = recv_loop(&sock, &exit, &recycler, &packet_sender); - () - }) + Builder::new() + .name("receiver".to_string()) + .spawn(move || { + let _ = recv_loop(&sock, &exit, &recycler, &packet_sender); + () + }) + .unwrap() } fn recv_send(sock: &UdpSocket, recycler: &BlobRecycler, r: &BlobReceiver) -> Result<()> { @@ -96,11 +99,14 @@ pub fn responder( recycler: BlobRecycler, r: BlobReceiver, ) -> JoinHandle<()> { - spawn(move || loop { - if recv_send(&sock, &recycler, &r).is_err() && exit.load(Ordering::Relaxed) { - break; - } - }) + Builder::new() + .name("responder".to_string()) + .spawn(move || loop { + if recv_send(&sock, &recycler, &r).is_err() && exit.load(Ordering::Relaxed) { + break; + } + }) + .unwrap() } //TODO, we would need to stick block authentication before we create the @@ -124,12 +130,15 @@ pub fn blob_receiver( //1 second timeout on socket read let timer = Duration::new(1, 0); sock.set_read_timeout(Some(timer))?; - let t = spawn(move || loop { - if exit.load(Ordering::Relaxed) { - break; - } - let _ = recv_blobs(&recycler, &sock, &s); - }); + let t = Builder::new() + .name("blob_receiver".to_string()) + .spawn(move || loop { + if exit.load(Ordering::Relaxed) { + break; + } + let _ = recv_blobs(&recycler, &sock, &s); + }) + .unwrap(); Ok(t) } @@ -320,35 +329,38 @@ pub fn window( s: BlobSender, retransmit: BlobSender, ) -> JoinHandle<()> { - spawn(move || { - let mut consumed = 0; - let mut received = 0; - let mut last = 0; - let mut times = 0; - loop { - if exit.load(Ordering::Relaxed) { - break; + Builder::new() + .name("window".to_string()) + .spawn(move || { + let mut consumed = 0; + let mut received = 0; + let mut last = 0; + let mut times = 0; + loop { + if exit.load(Ordering::Relaxed) { + break; + } + let _ = recv_window( + &window, + &crdt, + &recycler, + &mut consumed, + &mut received, + &r, + &s, + &retransmit, + ); + let _ = repair_window( + &window, + &crdt, + &mut last, + &mut times, + &mut consumed, + &mut received, + ); } - let _ = recv_window( - &window, - &crdt, - &recycler, - &mut consumed, - &mut received, - &r, - &s, - &retransmit, - ); - let _ = repair_window( - &window, - &crdt, - &mut last, - &mut times, - &mut consumed, - &mut received, - ); - } - }) + }) + .unwrap() } fn broadcast( @@ -415,15 +427,18 @@ pub fn broadcaster( recycler: BlobRecycler, r: BlobReceiver, ) -> JoinHandle<()> { - spawn(move || { - let mut transmit_index = 0; - loop { - if exit.load(Ordering::Relaxed) { - break; + Builder::new() + .name("retransmitter".to_string()) + .spawn(move || { + let mut transmit_index = 0; + loop { + if exit.load(Ordering::Relaxed) { + break; + } + let _ = broadcast(&crdt, &window, &recycler, &r, &sock, &mut transmit_index); } - let _ = broadcast(&crdt, &window, &recycler, &r, &sock, &mut transmit_index); - } - }) + }) + .unwrap() } fn retransmit( @@ -463,17 +478,20 @@ pub fn retransmitter( recycler: BlobRecycler, r: BlobReceiver, ) -> JoinHandle<()> { - spawn(move || { - trace!("retransmitter started"); - loop { - if exit.load(Ordering::Relaxed) { - break; + Builder::new() + .name("retransmitter".to_string()) + .spawn(move || { + trace!("retransmitter started"); + loop { + if exit.load(Ordering::Relaxed) { + break; + } + // TODO: handle this error + let _ = retransmit(&crdt, &recycler, &r, &sock); } - // TODO: handle this error - let _ = retransmit(&crdt, &recycler, &r, &sock); - } - trace!("exiting retransmitter"); - }) + trace!("exiting retransmitter"); + }) + .unwrap() } #[cfg(all(feature = "unstable", test))]