names for threds

This commit is contained in:
Anatoly Yakovenko
2018-05-30 13:13:14 -07:00
committed by Greg Fitzgerald
parent 68955bfcf4
commit 3eb005d492

View File

@ -10,7 +10,7 @@ use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc; use std::sync::mpsc;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::{spawn, JoinHandle}; use std::thread::{Builder, JoinHandle};
use std::time::Duration; use std::time::Duration;
pub const WINDOW_SIZE: usize = 2 * 1024; pub const WINDOW_SIZE: usize = 2 * 1024;
@ -58,10 +58,13 @@ pub fn receiver(
if res.is_err() { if res.is_err() {
panic!("streamer::receiver set_read_timeout error"); panic!("streamer::receiver set_read_timeout error");
} }
spawn(move || { Builder::new()
let _ = recv_loop(&sock, &exit, &recycler, &packet_sender); .name("receiver".to_string())
() .spawn(move || {
}) let _ = recv_loop(&sock, &exit, &recycler, &packet_sender);
()
})
.unwrap()
} }
fn recv_send(sock: &UdpSocket, recycler: &BlobRecycler, r: &BlobReceiver) -> Result<()> { fn recv_send(sock: &UdpSocket, recycler: &BlobRecycler, r: &BlobReceiver) -> Result<()> {
@ -96,11 +99,14 @@ pub fn responder(
recycler: BlobRecycler, recycler: BlobRecycler,
r: BlobReceiver, r: BlobReceiver,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
spawn(move || loop { Builder::new()
if recv_send(&sock, &recycler, &r).is_err() && exit.load(Ordering::Relaxed) { .name("responder".to_string())
break; .spawn(move || loop {
} if recv_send(&sock, &recycler, &r).is_err() && exit.load(Ordering::Relaxed) {
}) break;
}
})
.unwrap()
} }
//TODO, we would need to stick block authentication before we create the //TODO, we would need to stick block authentication before we create the
@ -124,12 +130,15 @@ pub fn blob_receiver(
//1 second timeout on socket read //1 second timeout on socket read
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
sock.set_read_timeout(Some(timer))?; sock.set_read_timeout(Some(timer))?;
let t = spawn(move || loop { let t = Builder::new()
if exit.load(Ordering::Relaxed) { .name("blob_receiver".to_string())
break; .spawn(move || loop {
} if exit.load(Ordering::Relaxed) {
let _ = recv_blobs(&recycler, &sock, &s); break;
}); }
let _ = recv_blobs(&recycler, &sock, &s);
})
.unwrap();
Ok(t) Ok(t)
} }
@ -320,35 +329,38 @@ pub fn window(
s: BlobSender, s: BlobSender,
retransmit: BlobSender, retransmit: BlobSender,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
spawn(move || { Builder::new()
let mut consumed = 0; .name("window".to_string())
let mut received = 0; .spawn(move || {
let mut last = 0; let mut consumed = 0;
let mut times = 0; let mut received = 0;
loop { let mut last = 0;
if exit.load(Ordering::Relaxed) { let mut times = 0;
break; loop {
if exit.load(Ordering::Relaxed) {
break;
}
let _ = recv_window(
&window,
&crdt,
&recycler,
&mut consumed,
&mut received,
&r,
&s,
&retransmit,
);
let _ = repair_window(
&window,
&crdt,
&mut last,
&mut times,
&mut consumed,
&mut received,
);
} }
let _ = recv_window( })
&window, .unwrap()
&crdt,
&recycler,
&mut consumed,
&mut received,
&r,
&s,
&retransmit,
);
let _ = repair_window(
&window,
&crdt,
&mut last,
&mut times,
&mut consumed,
&mut received,
);
}
})
} }
fn broadcast( fn broadcast(
@ -415,15 +427,18 @@ pub fn broadcaster(
recycler: BlobRecycler, recycler: BlobRecycler,
r: BlobReceiver, r: BlobReceiver,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
spawn(move || { Builder::new()
let mut transmit_index = 0; .name("retransmitter".to_string())
loop { .spawn(move || {
if exit.load(Ordering::Relaxed) { let mut transmit_index = 0;
break; loop {
if exit.load(Ordering::Relaxed) {
break;
}
let _ = broadcast(&crdt, &window, &recycler, &r, &sock, &mut transmit_index);
} }
let _ = broadcast(&crdt, &window, &recycler, &r, &sock, &mut transmit_index); })
} .unwrap()
})
} }
fn retransmit( fn retransmit(
@ -463,17 +478,20 @@ pub fn retransmitter(
recycler: BlobRecycler, recycler: BlobRecycler,
r: BlobReceiver, r: BlobReceiver,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
spawn(move || { Builder::new()
trace!("retransmitter started"); .name("retransmitter".to_string())
loop { .spawn(move || {
if exit.load(Ordering::Relaxed) { trace!("retransmitter started");
break; loop {
if exit.load(Ordering::Relaxed) {
break;
}
// TODO: handle this error
let _ = retransmit(&crdt, &recycler, &r, &sock);
} }
// TODO: handle this error trace!("exiting retransmitter");
let _ = retransmit(&crdt, &recycler, &r, &sock); })
} .unwrap()
trace!("exiting retransmitter");
})
} }
#[cfg(all(feature = "unstable", test))] #[cfg(all(feature = "unstable", test))]