Don't do error-prone things in functions that spawn threads

This commit is contained in:
Greg Fitzgerald
2018-05-15 09:53:51 -06:00
parent ee3fb985ea
commit 7e44005a0f
4 changed files with 11 additions and 9 deletions

View File

@ -60,7 +60,7 @@ impl Rpu {
exit.clone(), exit.clone(),
packet_recycler.clone(), packet_recycler.clone(),
packet_sender, packet_sender,
)?; );
let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver);

View File

@ -51,12 +51,12 @@ pub fn receiver(
sock: UdpSocket, sock: UdpSocket,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
recycler: PacketRecycler, recycler: PacketRecycler,
channel: PacketSender, packet_sender: PacketSender,
) -> Result<JoinHandle<()>> { ) -> JoinHandle<()> {
Ok(spawn(move || { spawn(move || {
let _ = recv_loop(&sock, &exit, &recycler, &channel); let _ = recv_loop(&sock, &exit, &recycler, &packet_sender);
() ()
})) })
} }
fn recv_send(sock: &UdpSocket, recycler: &BlobRecycler, r: &BlobReceiver) -> Result<()> { fn recv_send(sock: &UdpSocket, recycler: &BlobRecycler, r: &BlobReceiver) -> Result<()> {
@ -591,13 +591,15 @@ mod test {
#[test] #[test]
pub fn streamer_send_test() { pub fn streamer_send_test() {
let read = UdpSocket::bind("127.0.0.1:0").expect("bind"); let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
read.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
let addr = read.local_addr().unwrap(); let addr = read.local_addr().unwrap();
let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let pack_recycler = PacketRecycler::default(); let pack_recycler = PacketRecycler::default();
let resp_recycler = BlobRecycler::default(); let resp_recycler = BlobRecycler::default();
let (s_reader, r_reader) = channel(); let (s_reader, r_reader) = channel();
let t_receiver = receiver(read, exit.clone(), pack_recycler.clone(), s_reader).unwrap(); let t_receiver = receiver(read, exit.clone(), pack_recycler.clone(), s_reader);
let (s_responder, r_responder) = channel(); let (s_responder, r_responder) = channel();
let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder); let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder);
let mut msgs = VecDeque::new(); let mut msgs = VecDeque::new();

View File

@ -58,7 +58,7 @@ impl Tpu {
exit.clone(), exit.clone(),
packet_recycler.clone(), packet_recycler.clone(),
packet_sender, packet_sender,
)?; );
let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver);

View File

@ -156,7 +156,7 @@ impl Tvu {
exit.clone(), exit.clone(),
packet_recycler.clone(), packet_recycler.clone(),
packet_sender, packet_sender,
)?; );
let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver);