diff --git a/src/rpu.rs b/src/rpu.rs index d48dc1510f..df7adca90c 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -47,14 +47,11 @@ impl Rpu { exit: Arc, writer: W, ) -> Result>> { - let crdt = Arc::new(RwLock::new(Crdt::new(me))); - let t_gossip = Crdt::gossip(crdt.clone(), exit.clone()); - let window = streamer::default_window(); - let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip, exit.clone()); - // make sure we are on the same interface let mut local = requests_socket.local_addr()?; local.set_port(0); + let broadcast_socket = UdpSocket::bind(local)?; + let respond_socket = UdpSocket::bind(local.clone())?; let packet_recycler = packet::PacketRecycler::default(); let (packet_sender, packet_receiver) = channel(); @@ -91,7 +88,11 @@ impl Rpu { record_stage.entry_receiver, ); - let broadcast_socket = UdpSocket::bind(local)?; + let crdt = Arc::new(RwLock::new(Crdt::new(me))); + let t_gossip = Crdt::gossip(crdt.clone(), exit.clone()); + let window = streamer::default_window(); + let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip, exit.clone()); + let t_broadcast = streamer::broadcaster( broadcast_socket, exit.clone(), @@ -101,7 +102,6 @@ impl Rpu { write_stage.blob_receiver, ); - let respond_socket = UdpSocket::bind(local.clone())?; let t_responder = streamer::responder( respond_socket, exit.clone(), diff --git a/src/tpu.rs b/src/tpu.rs index accda0d52d..455a32ad10 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -46,14 +46,10 @@ impl Tpu { exit: Arc, writer: W, ) -> Result>> { - let crdt = Arc::new(RwLock::new(Crdt::new(me))); - let t_gossip = Crdt::gossip(crdt.clone(), exit.clone()); - let window = streamer::default_window(); - let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip, exit.clone()); - // make sure we are on the same interface let mut local = requests_socket.local_addr()?; local.set_port(0); + let broadcast_socket = UdpSocket::bind(local)?; let packet_recycler = packet::PacketRecycler::default(); let (packet_sender, packet_receiver) = channel(); @@ -88,7 +84,11 @@ impl Tpu { record_stage.entry_receiver, ); - let broadcast_socket = UdpSocket::bind(local)?; + let crdt = Arc::new(RwLock::new(Crdt::new(me))); + let t_gossip = Crdt::gossip(crdt.clone(), exit.clone()); + let window = streamer::default_window(); + let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip, exit.clone()); + let t_broadcast = streamer::broadcaster( broadcast_socket, exit.clone(),