diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 715506263c..c1f32c68ca 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -132,9 +132,22 @@ fn main() { replicate_sock.local_addr().unwrap(), serve_sock.local_addr().unwrap(), ); + + let mut local = serve_sock.local_addr().unwrap(); + local.set_port(0); + let broadcast_socket = UdpSocket::bind(local).unwrap(); + let respond_socket = UdpSocket::bind(local.clone()).unwrap(); + eprintln!("starting server..."); - let threads = rpu.serve(d, serve_sock, gossip_sock, exit.clone(), stdout()) - .unwrap(); + let threads = rpu.serve( + d, + serve_sock, + broadcast_socket, + respond_socket, + gossip_sock, + exit.clone(), + stdout(), + ).unwrap(); eprintln!("Ready. Listening on {}", serve_addr); for t in threads { t.join().expect("join"); diff --git a/src/rpu.rs b/src/rpu.rs index 329789a991..867c0bd766 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -43,16 +43,12 @@ impl Rpu { &self, me: ReplicatedData, requests_socket: UdpSocket, + broadcast_socket: UdpSocket, + respond_socket: UdpSocket, gossip: UdpSocket, exit: Arc, writer: W, ) -> Result>> { - // make sure we are on the same interface - let mut local = requests_socket.local_addr()?; - local.set_port(0); - let broadcast_socket = UdpSocket::bind(local)?; - let respond_socket = UdpSocket::bind(local.clone())?; - let packet_recycler = packet::PacketRecycler::default(); let (packet_sender, packet_receiver) = channel(); let t_receiver = streamer::receiver( diff --git a/src/thin_client.rs b/src/thin_client.rs index 53bf7032d0..426625f071 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -194,7 +194,21 @@ mod tests { let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); let rpu = Rpu::new(bank, alice.last_id(), Some(Duration::from_millis(30))); - let threads = rpu.serve(d, serve, gossip, exit.clone(), sink()).unwrap(); + + let mut local = serve.local_addr().unwrap(); + local.set_port(0); + let broadcast_socket = UdpSocket::bind(local).unwrap(); + let respond_socket = UdpSocket::bind(local.clone()).unwrap(); + + let threads = rpu.serve( + d, + serve, + broadcast_socket, + respond_socket, + gossip, + exit.clone(), + sink(), + ).unwrap(); sleep(Duration::from_millis(900)); let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); @@ -233,9 +247,17 @@ mod tests { let exit = Arc::new(AtomicBool::new(false)); let rpu = Rpu::new(bank, alice.last_id(), Some(Duration::from_millis(30))); let serve_addr = leader_serve.local_addr().unwrap(); + + let mut local = leader_serve.local_addr().unwrap(); + local.set_port(0); + let broadcast_socket = UdpSocket::bind(local).unwrap(); + let respond_socket = UdpSocket::bind(local.clone()).unwrap(); + let threads = rpu.serve( leader_data, leader_serve, + broadcast_socket, + respond_socket, leader_gossip, exit.clone(), sink(), @@ -377,8 +399,21 @@ mod tests { Rpu::new(bank, alice.last_id(), None) }; + let mut local = leader.2.local_addr().unwrap(); + local.set_port(0); + let broadcast_socket = UdpSocket::bind(local).unwrap(); + let respond_socket = UdpSocket::bind(local.clone()).unwrap(); + let mut threads = leader_bank - .serve(leader.0.clone(), leader.2, leader.1, exit.clone(), sink()) + .serve( + leader.0.clone(), + leader.2, + broadcast_socket, + respond_socket, + leader.1, + exit.clone(), + sink(), + ) .unwrap(); for _ in 0..N {