From 0aaa500f7c1202e71fb6b25e6c145d4d5a84a6f9 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Tue, 15 May 2018 10:09:54 -0600 Subject: [PATCH] Rpu/Tpu serve() functions now only spin up threads --- src/bin/testnode.rs | 2 +- src/rpu.rs | 5 ++--- src/thin_client.rs | 24 +++++++++++------------- src/tpu.rs | 10 +++------- 4 files changed, 17 insertions(+), 24 deletions(-) diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index c1f32c68ca..3d8e4f2126 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -147,7 +147,7 @@ fn main() { gossip_sock, exit.clone(), stdout(), - ).unwrap(); + ); eprintln!("Ready. Listening on {}", serve_addr); for t in threads { t.join().expect("join"); diff --git a/src/rpu.rs b/src/rpu.rs index 867c0bd766..c6ee2d6f55 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -8,7 +8,6 @@ use packet; use record_stage::RecordStage; use request_processor::RequestProcessor; use request_stage::RequestStage; -use result::Result; use sig_verify_stage::SigVerifyStage; use std::io::Write; use std::net::UdpSocket; @@ -48,7 +47,7 @@ impl Rpu { gossip: UdpSocket, exit: Arc, writer: W, - ) -> Result>> { + ) -> Vec> { let packet_recycler = packet::PacketRecycler::default(); let (packet_sender, packet_receiver) = channel(); let t_receiver = streamer::receiver( @@ -115,6 +114,6 @@ impl Rpu { t_broadcast, ]; threads.extend(sig_verify_stage.thread_hdls.into_iter()); - Ok(threads) + threads } } diff --git a/src/thin_client.rs b/src/thin_client.rs index 426625f071..36a4464381 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -208,7 +208,7 @@ mod tests { gossip, exit.clone(), sink(), - ).unwrap(); + ); sleep(Duration::from_millis(900)); let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); @@ -261,7 +261,7 @@ mod tests { leader_gossip, exit.clone(), sink(), - ).unwrap(); + ); sleep(Duration::from_millis(300)); let requests_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); @@ -404,17 +404,15 @@ mod tests { let broadcast_socket = UdpSocket::bind(local).unwrap(); let respond_socket = UdpSocket::bind(local.clone()).unwrap(); - let mut threads = leader_bank - .serve( - leader.0.clone(), - leader.2, - broadcast_socket, - respond_socket, - leader.1, - exit.clone(), - sink(), - ) - .unwrap(); + let mut threads = leader_bank.serve( + leader.0.clone(), + leader.2, + broadcast_socket, + respond_socket, + leader.1, + exit.clone(), + sink(), + ); for _ in 0..N { replicant(&leader.0, exit.clone(), &alice, &mut threads); diff --git a/src/tpu.rs b/src/tpu.rs index 51e11d1289..6b47207353 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -42,15 +42,11 @@ impl Tpu { &self, me: ReplicatedData, requests_socket: UdpSocket, + broadcast_socket: UdpSocket, gossip: UdpSocket, exit: Arc, writer: W, - ) -> Result>> { - // make sure we are on the same interface - let mut local = requests_socket.local_addr()?; - local.set_port(0); - let broadcast_socket = UdpSocket::bind(local)?; - + ) -> Vec> { let packet_recycler = packet::PacketRecycler::default(); let (packet_sender, packet_receiver) = channel(); let t_receiver = streamer::receiver( @@ -107,6 +103,6 @@ impl Tpu { t_broadcast, ]; threads.extend(sig_verify_stage.thread_hdls.into_iter()); - Ok(threads) + threads } }