Hoist socket creation/configuration

TODO: Add a library for socket configuration.
This commit is contained in:
Greg Fitzgerald
2018-05-15 10:05:20 -06:00
parent 7e44005a0f
commit 5f5be83a17
3 changed files with 54 additions and 10 deletions

View File

@ -132,9 +132,22 @@ fn main() {
replicate_sock.local_addr().unwrap(),
serve_sock.local_addr().unwrap(),
);
let mut local = serve_sock.local_addr().unwrap();
local.set_port(0);
let broadcast_socket = UdpSocket::bind(local).unwrap();
let respond_socket = UdpSocket::bind(local.clone()).unwrap();
eprintln!("starting server...");
let threads = rpu.serve(d, serve_sock, gossip_sock, exit.clone(), stdout())
.unwrap();
let threads = rpu.serve(
d,
serve_sock,
broadcast_socket,
respond_socket,
gossip_sock,
exit.clone(),
stdout(),
).unwrap();
eprintln!("Ready. Listening on {}", serve_addr);
for t in threads {
t.join().expect("join");

View File

@ -43,16 +43,12 @@ impl Rpu {
&self,
me: ReplicatedData,
requests_socket: UdpSocket,
broadcast_socket: UdpSocket,
respond_socket: UdpSocket,
gossip: UdpSocket,
exit: Arc<AtomicBool>,
writer: W,
) -> Result<Vec<JoinHandle<()>>> {
// make sure we are on the same interface
let mut local = requests_socket.local_addr()?;
local.set_port(0);
let broadcast_socket = UdpSocket::bind(local)?;
let respond_socket = UdpSocket::bind(local.clone())?;
let packet_recycler = packet::PacketRecycler::default();
let (packet_sender, packet_receiver) = channel();
let t_receiver = streamer::receiver(

View File

@ -194,7 +194,21 @@ mod tests {
let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let rpu = Rpu::new(bank, alice.last_id(), Some(Duration::from_millis(30)));
let threads = rpu.serve(d, serve, gossip, exit.clone(), sink()).unwrap();
let mut local = serve.local_addr().unwrap();
local.set_port(0);
let broadcast_socket = UdpSocket::bind(local).unwrap();
let respond_socket = UdpSocket::bind(local.clone()).unwrap();
let threads = rpu.serve(
d,
serve,
broadcast_socket,
respond_socket,
gossip,
exit.clone(),
sink(),
).unwrap();
sleep(Duration::from_millis(900));
let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
@ -233,9 +247,17 @@ mod tests {
let exit = Arc::new(AtomicBool::new(false));
let rpu = Rpu::new(bank, alice.last_id(), Some(Duration::from_millis(30)));
let serve_addr = leader_serve.local_addr().unwrap();
let mut local = leader_serve.local_addr().unwrap();
local.set_port(0);
let broadcast_socket = UdpSocket::bind(local).unwrap();
let respond_socket = UdpSocket::bind(local.clone()).unwrap();
let threads = rpu.serve(
leader_data,
leader_serve,
broadcast_socket,
respond_socket,
leader_gossip,
exit.clone(),
sink(),
@ -377,8 +399,21 @@ mod tests {
Rpu::new(bank, alice.last_id(), None)
};
let mut local = leader.2.local_addr().unwrap();
local.set_port(0);
let broadcast_socket = UdpSocket::bind(local).unwrap();
let respond_socket = UdpSocket::bind(local.clone()).unwrap();
let mut threads = leader_bank
.serve(leader.0.clone(), leader.2, leader.1, exit.clone(), sink())
.serve(
leader.0.clone(),
leader.2,
broadcast_socket,
respond_socket,
leader.1,
exit.clone(),
sink(),
)
.unwrap();
for _ in 0..N {